Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,25 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
defaultValue = "",
help = "Path to the native Amazon Kinesis Producer Library (KPL) binary.\n"
+ "Only use this setting if you want to use a custom build of the native code.\n"
+ "This setting can also be set with the environment variable `PULSAR_IO_KINESIS_KPL_PATH`.\n"
+ "This setting can also be set with the environment variable `PULSAR_IO_KINESIS_KPL_1_0_PATH`"
+ "or `PULSAR_IO_KINESIS_KPL_PATH`.\n"
+ "If not set, the Kinesis sink will use the built-in native executable."
)
private String nativeExecutable = System.getenv("PULSAR_IO_KINESIS_KPL_PATH");
private String nativeExecutable = resolveDefaultKinesisProducerLibraryPath();

private static String resolveDefaultKinesisProducerLibraryPath() {
// Prefer PULSAR_IO_KINESIS_KPL_1_0_PATH environment variable over PULSAR_IO_KINESIS_KPL_PATH.
// This setting supports building a Pulsar Functions base image that is used to run different Pulsar IO Kinesis
// sink versions. The older versions of Pulsar IO Kinesis sink can continue to use the binary configured with
// PULSAR_IO_KINESIS_KPL_PATH, pointing to a 0.15.12 native executable. The newer versions of Pulsar IO Kinesis
// sink can use the binary configured with PULSAR_IO_KINESIS_KPL_1_0_PATH, pointing to a 1.0.4
// native executable.
String kplPath = System.getenv("PULSAR_IO_KINESIS_KPL_1_0_PATH");
if (isNotBlank(kplPath)) {
return kplPath;
}
return System.getenv("PULSAR_IO_KINESIS_KPL_PATH");
}

public static KinesisSinkConfig load(Map<String, Object> config, SinkContext sinkContext) {
KinesisSinkConfig kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
Expand Down
Loading