diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala index e1314ae2..0263e7a1 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala @@ -33,11 +33,13 @@ import org.eclipse.paho.client.mqttv3._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport} import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.bahir.utils.Logging @@ -199,8 +201,12 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc } override def get(): InternalRow = { - InternalRow(slice(currentIdx).id, slice(currentIdx).topic, - slice(currentIdx).payload, slice(currentIdx).timestamp) + InternalRow( + slice(currentIdx).id, + UTF8String.fromString(slice(currentIdx).topic), + slice(currentIdx).payload, + DateTimeUtils.fromJavaTimestamp(slice(currentIdx).timestamp) + ) } override def close(): Unit = {}