This repository contains examples of custom deserializers usable in Conduktor with plugin mechanism.
You can download the latest jar containing these deserializers my_custom_deserializers_2.13-2.3.0.jar
To learn how to use the "custom deserializer" feature see Conduktor documentation
tunein.kafka.messages.MyCustomDeserializer
This deserializer transforms the data (bytes) it receives from Kafka into a String (text),
then sees if it matches then following format:
-- this is the serialized data
-
If the message received from Kafka effectively starts with a
--<space>characters sequence then followed by some text, it creates a new instance of a data structure namedMyMessage, that contains only one field namedvalueand is of typeString, as following:MyMessage(value = "this is the serialized data")
In Conduktor, this data structure will be interpreted and displayed as JSON:
{ "value": "this is the serialized data" } -
If the message received from Kafka doesn't match the expected format, then the deserializer fails with an error message:
Invalid format received for message: <the received message>
This simple example is here to demonstrate 2 things:
- What's happening when a custom deserializer fails to deserialize a message.
- Give a simplified example of "deserialization" (the message has to respect of certain format so that the deserializer can extract the data)
To see simple example around constants, jump here
tunein.kafka.messages.MyCustomProtobufDeserializer
This example allow to deserialize a protobuf payload corresponding to this schema :
message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
required string number = 1;
optional PhoneType type = 2 [default = HOME];
}
repeated PhoneNumber phones = 4;
}
tunein.kafka.messages.MyConfigurableDeserializer
This example allow to show deserializer configuration to change it's behavior.
To configure the behabor, the Deserializer check for a output property in it's configuration.
With configuration :
output=passthroughThe data on record are not de coded and returned as-is in bytes array form.
With configuration :
output=configThe configuration is returned on each record deserialization. For example with configuration
output=config
other.property=some valueWill always return JSON like
{
"output": "config",
"other.property": "some value"
}With configuration output defined to something else other than config or passthrough and not empty like:
output=some constant outputThe Deserializer will always return String value like
"some constant output"Build jar:
sbt assembly - this will build the .jar file together with all dependencies within it
Copy:
kubedev cp ./target/scala-2.13/configurable_deserializer.jar <pod_name>:/opt/conduktor/plugins/configurable_deserializer_0.5.jar -n conduktor