From 37f7fb3b1dad30ee43b3f36a6061b133f2727fe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Schm=C3=B6le?= Date: Fri, 22 Jan 2021 09:34:50 +0100 Subject: [PATCH] [BAHIR-256] Add conversions to Spark-compatible types In org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala, added conversions to the 'topic' and 'timestamp' fields in order to make them compatible with Spark SQL. --- .../bahir/sql/streaming/mqtt/MQTTStreamSource.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala index e1314ae2..0263e7a1 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala @@ -33,11 +33,13 @@ import org.eclipse.paho.client.mqttv3._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport} import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.bahir.utils.Logging @@ -199,8 +201,12 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc } override def get(): InternalRow = { - InternalRow(slice(currentIdx).id, slice(currentIdx).topic, - slice(currentIdx).payload, slice(currentIdx).timestamp) + InternalRow( + slice(currentIdx).id, + UTF8String.fromString(slice(currentIdx).topic), + slice(currentIdx).payload, + DateTimeUtils.fromJavaTimestamp(slice(currentIdx).timestamp) + ) } override def close(): Unit = {}