From bd0f7c55d01f646c80d5cf37c0c4a6bcefa27562 Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Wed, 15 Dec 2021 13:11:26 +0530 Subject: [PATCH 1/5] Added backpressure & ratelimit support --- .../streaming/pubsub/PubsubInputDStream.scala | 107 ++++++++++++++---- .../spark/streaming/pubsub/PubsubUtils.scala | 8 +- .../streaming/pubsub/PubsubStreamSuite.scala | 35 +++++- 3 files changed, 126 insertions(+), 24 deletions(-) diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala index 7357d231..99abee5b 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala @@ -18,6 +18,9 @@ package org.apache.spark.streaming.pubsub import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.lang +import java.lang.Runtime +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -26,10 +29,11 @@ import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport import com.google.api.client.googleapis.json.GoogleJsonResponseException import com.google.api.client.json.jackson2.JacksonFactory import com.google.api.services.pubsub.Pubsub.Builder -import com.google.api.services.pubsub.model.{AcknowledgeRequest, PubsubMessage, PullRequest} -import com.google.api.services.pubsub.model.Subscription +import com.google.api.services.pubsub.model.{AcknowledgeRequest, PubsubMessage, PullRequest, ReceivedMessage, Subscription} import com.google.cloud.hadoop.util.RetryHttpInitializer +import com.google.common.util.concurrent.RateLimiter +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream @@ -51,11 +55,16 @@ class PubsubInputDStream( val subscription: String, val credential: SparkGCPCredentials, val _storageLevel: StorageLevel, - val autoAcknowledge: Boolean + val autoAcknowledge: Boolean, + val maxNoOfMessageInRequest: Int, + conf: SparkConf ) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) { override def getReceiver(): Receiver[SparkPubsubMessage] = { - new PubsubReceiver(project, topic, subscription, credential, _storageLevel, autoAcknowledge) + new PubsubReceiver( + project, topic, subscription, credential, _storageLevel, autoAcknowledge, + maxNoOfMessageInRequest, conf + ) } } @@ -222,7 +231,9 @@ class PubsubReceiver( subscription: String, credential: SparkGCPCredentials, storageLevel: StorageLevel, - autoAcknowledge: Boolean) + autoAcknowledge: Boolean, + maxNoOfMessageInRequest: Int, + conf: SparkConf) extends Receiver[SparkPubsubMessage](storageLevel) { val APP_NAME = "sparkstreaming-pubsub-receiver" @@ -233,6 +244,16 @@ class PubsubReceiver( val MAX_MESSAGE = 1000 + val maxRateLimit: Long = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) + + val blockSize: Int = conf.getInt("spark.streaming.blockQueueSize", maxNoOfMessageInRequest) + + var previousRate: Long = -1 + + lazy val rateLimiter: RateLimiter = RateLimiter.create(getInitialRateLimit.toDouble) + + lazy val scheduledExecutor = Executors.newSingleThreadScheduledExecutor() + lazy val client = new Builder( ConnectionUtils.transport, ConnectionUtils.jacksonFactory, @@ -262,15 +283,24 @@ class PubsubReceiver( } case None => // do nothing } + new Thread() { override def run() { receive() } }.start() + + // Scheduling update rate limit method at every second. + scheduledExecutor.scheduleAtFixedRate( + new Runnable { + override def run(): Unit = updateRateLimit() + }, 0, 1, TimeUnit.SECONDS + ) } def receive(): Unit = { - val pullRequest = new PullRequest().setMaxMessages(MAX_MESSAGE).setReturnImmediately(false) + val pullRequest = new PullRequest() + .setMaxMessages(maxNoOfMessageInRequest).setReturnImmediately(false) var backoff = INIT_BACKOFF while (!isStopped()) { try { @@ -278,21 +308,7 @@ class PubsubReceiver( client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute() val receivedMessages = pullResponse.getReceivedMessages if (receivedMessages != null) { - store(receivedMessages.asScala.toList - .map(x => { - val sm = new SparkPubsubMessage - sm.message = x.getMessage - sm.ackId = x.getAckId - sm - }) - .iterator) - - if (autoAcknowledge) { - val ackRequest = new AcknowledgeRequest() - ackRequest.setAckIds(receivedMessages.asScala.map(x => x.getAckId).asJava) - client.projects().subscriptions().acknowledge(subscriptionFullName, - ackRequest).execute() - } + pushToStoreAndAck(receivedMessages.asScala.toList) } backoff = INIT_BACKOFF } catch { @@ -308,5 +324,54 @@ class PubsubReceiver( } } + def getInitialRateLimit: Long = { + math.min( + conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), + maxRateLimit + ) + } + + /** + * Get the new recommended rate at which receiver should push data into store + * and update the rate limiter with new rate + */ + def updateRateLimit(): Unit = { + val newRateLimit = supervisor.getCurrentRateLimit.min(maxRateLimit) + if (newRateLimit > 0 && newRateLimit != previousRate) { + rateLimiter.setRate(newRateLimit) + previousRate = newRateLimit + } + } + + /** + * Push the list of received message into store and ack messages if auto ack is true + * @param receivedMessages + */ + def pushToStoreAndAck(receivedMessages: List[ReceivedMessage]): Unit = { + receivedMessages + .map(x => { + val sm = new SparkPubsubMessage + sm.message = x.getMessage + sm.ackId = x.getAckId + sm}) + .grouped(blockSize) + .foreach(messages => { + rateLimiter.acquire(messages.size) + store(messages.toIterator) + if (autoAcknowledge) acknowledgeIds(messages.map(_.ackId)) + }) + } + + /** + * Acknowledge Message ackIds + * @param ackIds + */ + def acknowledgeIds(ackIds: List[String]): Unit = { + val ackRequest = new AcknowledgeRequest() + ackRequest.setAckIds(ackIds.asJava) + client.projects().subscriptions() + .acknowledge(subscriptionFullName, ackRequest).execute() + } + override def onStop(): Unit = {} } diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala index 05214c34..035de782 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala @@ -50,7 +50,8 @@ object PubsubUtils { subscription: String, credentials: SparkGCPCredentials, storageLevel: StorageLevel, - autoAcknowledge: Boolean = true): ReceiverInputDStream[SparkPubsubMessage] = { + autoAcknowledge: Boolean = true, + maxNoOfMessageInRequest: Int = 1000): ReceiverInputDStream[SparkPubsubMessage] = { ssc.withNamedScope("pubsub stream") { new PubsubInputDStream( @@ -60,7 +61,10 @@ object PubsubUtils { subscription, credentials, storageLevel, - autoAcknowledge) + autoAcknowledge, + maxNoOfMessageInRequest, + ssc.conf + ) } } diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala index 8a67038d..2cc0f78d 100644 --- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala @@ -25,7 +25,7 @@ import scala.language.postfixOps import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually -import org.apache.spark.ConditionalSparkFunSuite +import org.apache.spark.{ConditionalSparkFunSuite, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext @@ -35,6 +35,8 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with Be val batchDuration = Seconds(1) + val blockSize = 10 + private val master: String = "local[2]" private val appName: String = this.getClass.getSimpleName @@ -70,6 +72,16 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with Be } } + + def setSparkBackPressureConf(conf: SparkConf) : Unit = { + conf.set("spark.streaming.backpressure.enabled", "true") + conf.set("spark.streaming.backpressure.initialRate", "50") + conf.set("spark.streaming.receiver.maxRate", "100") + conf.set("spark.streaming.backpressure.pid.minRate", "10") + conf.set("spark.streaming.blockQueueSize", blockSize.toString) + } + + before { ssc = new StreamingContext(master, appName, batchDuration) } @@ -113,6 +125,27 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with Be sendReceiveMessages(receiveStream) } + testIf("check block size", () => PubsubTestUtils.shouldRunTest()) { + setSparkBackPressureConf(ssc.sparkContext.conf) + val receiveStream = PubsubUtils.createStream( + ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName, + PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2, autoAcknowledge = true, 50) + + @volatile var partitionSize: Set[Int] = Set[Int]() + receiveStream.foreachRDD(rdd => { + rdd.collectPartitions().foreach(partition => { + partitionSize += partition.length + }) + }) + + ssc.start() + + eventually(timeout(100000 milliseconds), interval(1000 milliseconds)) { + pubsubTestUtils.publishData(topicFullName, pubsubTestUtils.generatorMessages(100)) + assert(partitionSize.max == blockSize) + } + } + private def sendReceiveMessages(receiveStream: ReceiverInputDStream[SparkPubsubMessage]): Unit = { @volatile var receiveMessages: List[SparkPubsubMessage] = List() receiveStream.foreachRDD { rdd => From ad74b81c0f02528509c38c26ef87f421795ced4f Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Thu, 16 Dec 2021 16:16:34 +0530 Subject: [PATCH 2/5] removed scheduled thread to update ratelimit --- .../streaming/pubsub/PubsubInputDStream.scala | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala index 99abee5b..bb14a3df 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala @@ -242,18 +242,13 @@ class PubsubReceiver( val MAX_BACKOFF = 10 * 1000 // 10s - val MAX_MESSAGE = 1000 - val maxRateLimit: Long = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) val blockSize: Int = conf.getInt("spark.streaming.blockQueueSize", maxNoOfMessageInRequest) - var previousRate: Long = -1 lazy val rateLimiter: RateLimiter = RateLimiter.create(getInitialRateLimit.toDouble) - lazy val scheduledExecutor = Executors.newSingleThreadScheduledExecutor() - lazy val client = new Builder( ConnectionUtils.transport, ConnectionUtils.jacksonFactory, @@ -289,13 +284,6 @@ class PubsubReceiver( receive() } }.start() - - // Scheduling update rate limit method at every second. - scheduledExecutor.scheduleAtFixedRate( - new Runnable { - override def run(): Unit = updateRateLimit() - }, 0, 1, TimeUnit.SECONDS - ) } def receive(): Unit = { @@ -307,6 +295,10 @@ class PubsubReceiver( val pullResponse = client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute() val receivedMessages = pullResponse.getReceivedMessages + + // update rate limit if required + updateRateLimit() + if (receivedMessages != null) { pushToStoreAndAck(receivedMessages.asScala.toList) } @@ -337,9 +329,8 @@ class PubsubReceiver( */ def updateRateLimit(): Unit = { val newRateLimit = supervisor.getCurrentRateLimit.min(maxRateLimit) - if (newRateLimit > 0 && newRateLimit != previousRate) { + if (rateLimiter.getRate != newRateLimit) { rateLimiter.setRate(newRateLimit) - previousRate = newRateLimit } } From 26703555ea92668e94d0f8d147c86d43942197b0 Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Mon, 20 Dec 2021 13:59:07 +0530 Subject: [PATCH 3/5] added comment --- .../streaming/pubsub/PubsubInputDStream.scala | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala index bb14a3df..d2a66604 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala @@ -223,7 +223,28 @@ object ConnectionUtils { } } - +/** + * Custom spark receiver to pull messages from Pubsub topic and push into reliable store. + * If backpressure is enabled,the message ingestion rate for this receiver will be managed by Spark. + * + * Following spark configurations can be used to control rates and block size + * spark.streaming.backpressure.initialRate + * spark.streaming.receiver.maxRate + * spark.streaming.blockQueueSize: Controlling block size + * spark.streaming.backpressure.pid.minRate + * + * See Spark streaming configurations doc + * * + * NOTE: For given subscription assuming ackDeadlineSeconds is sufficient. + * So that messages will not expire if it is buffer for given blockIntervalMs + * * @param project Google cloud project id * @param topic Topic name for creating subscription if need * @param subscription Pub/Sub subscription name @@ -243,6 +245,9 @@ object ConnectionUtils { * @param storageLevel Storage level to be used * @param autoAcknowledge Acknowledge pubsub message or not * @param maxNoOfMessageInRequest Maximum number of message in a Pubsub pull request + * @param rateMultiplierFactor Increase the proposed rate estimated by PIDEstimator to take the + * advantage of dynamic allocation of executor. + * Default should be 1 if dynamic allocation is not enabled * @param conf Spark config */ private[pubsub] @@ -254,6 +259,7 @@ class PubsubReceiver( storageLevel: StorageLevel, autoAcknowledge: Boolean, maxNoOfMessageInRequest: Int, + rateMultiplierFactor: Double, conf: SparkConf) extends Receiver[SparkPubsubMessage](storageLevel) { @@ -267,6 +273,11 @@ class PubsubReceiver( val blockSize: Int = conf.getInt("spark.streaming.blockQueueSize", maxNoOfMessageInRequest) + val blockIntervalMs: Long = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms") + + var buffer: ArrayBuffer[ReceivedMessage] = createBufferArray() + + var latestAttemptToPushInStoreTime: Long = -1 lazy val rateLimiter: RateLimiter = RateLimiter.create(getInitialRateLimit.toDouble) @@ -285,6 +296,7 @@ class PubsubReceiver( case Some(t) => val sub: Subscription = new Subscription sub.setTopic(s"$projectFullName/topics/$t") + sub.setAckDeadlineSeconds(30) try { client.projects().subscriptions().create(subscriptionFullName, sub).execute() } catch { @@ -311,8 +323,13 @@ class PubsubReceiver( val pullRequest = new PullRequest() .setMaxMessages(maxNoOfMessageInRequest).setReturnImmediately(false) var backoff = INIT_BACKOFF + + // To avoid the edge case when buffer is not full and no message pushed to store + latestAttemptToPushInStoreTime = System.currentTimeMillis() + while (!isStopped()) { try { + val pullResponse = client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute() val receivedMessages = pullResponse.getReceivedMessages @@ -320,9 +337,14 @@ class PubsubReceiver( // update rate limit if required updateRateLimit() + // Put data into buffer if (receivedMessages != null) { - pushToStoreAndAck(receivedMessages.asScala.toList) + buffer.appendAll(receivedMessages.asScala) } + + // Push data from buffer to store + push() + backoff = INIT_BACKOFF } catch { case e: GoogleJsonResponseException => @@ -349,29 +371,80 @@ class PubsubReceiver( * and update the rate limiter with new rate */ def updateRateLimit(): Unit = { - val newRateLimit = supervisor.getCurrentRateLimit.min(maxRateLimit) + val newRateLimit = rateMultiplierFactor * supervisor.getCurrentRateLimit.min(maxRateLimit) if (rateLimiter.getRate != newRateLimit) { rateLimiter.setRate(newRateLimit) } } + /** + * Push data into store if + * 1. buffer size greater than equal to blockSize, or + * 2. blockInterval time is passed and buffer size is less than blockSize + * + * Before pushing the messages, first create iterator of complete block(s) and partial blocks + * and assigning new array to buffer. + * + * So during pushing data into store if any {@link org.apache.spark.SparkException} occur + * then all un-push messages or un-ack will be lost. + * + * To recover lost messages we are relying on pubsub + * (i.e after ack deadline passed then pubsub will again give that messages) + */ + def push(): Unit = { + + val diff = System.currentTimeMillis() - latestAttemptToPushInStoreTime + if (buffer.length >= blockSize || (buffer.length < blockSize && diff >= blockIntervalMs)) { + + // grouping messages into complete and partial blocks (if any) + val (completeBlocks, partialBlock) = buffer.grouped(blockSize) + .partition(block => block.length == blockSize) + + // If completeBlocks is empty it means within block interval time + // messages in buffer is less than blockSize. So will push partial block + val iterator = if (completeBlocks.nonEmpty) completeBlocks else partialBlock + + // Will push partial block messages back to buffer if complete blocks formed + val partial = if (completeBlocks.nonEmpty && partialBlock.nonEmpty) { + partialBlock.next() + } else null + + while (iterator.hasNext) { + try { + pushToStoreAndAck(iterator.next().toList) + } catch { + case e: SparkException => reportError( + "Failed to write messages into reliable store", e) + case NonFatal(e) => reportError( + "Failed to write messages in reliable store", e) + } finally { + latestAttemptToPushInStoreTime = System.currentTimeMillis() + } + } + + // clear existing buffer messages + buffer.clear() + + // Pushing partial block messages back to buffer if complete blocks formed + if (partial != null) buffer.appendAll(partial) + } + } + /** * Push the list of received message into store and ack messages if auto ack is true * @param receivedMessages */ def pushToStoreAndAck(receivedMessages: List[ReceivedMessage]): Unit = { - receivedMessages + val messages = receivedMessages .map(x => { val sm = new SparkPubsubMessage sm.message = x.getMessage sm.ackId = x.getAckId sm}) - .grouped(blockSize) - .foreach(messages => { - rateLimiter.acquire(messages.size) - store(messages.toIterator) - if (autoAcknowledge) acknowledgeIds(messages.map(_.ackId)) - }) + + rateLimiter.acquire(messages.size) + store(messages.toIterator) + if (autoAcknowledge) acknowledgeIds(messages.map(_.ackId)) } /** @@ -385,5 +458,9 @@ class PubsubReceiver( .acknowledge(subscriptionFullName, ackRequest).execute() } + private def createBufferArray(): ArrayBuffer[ReceivedMessage] = { + new ArrayBuffer[ReceivedMessage](2 * math.max(maxNoOfMessageInRequest, blockSize)) + } + override def onStop(): Unit = {} } diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala index 035de782..909715f2 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala @@ -51,7 +51,8 @@ object PubsubUtils { credentials: SparkGCPCredentials, storageLevel: StorageLevel, autoAcknowledge: Boolean = true, - maxNoOfMessageInRequest: Int = 1000): ReceiverInputDStream[SparkPubsubMessage] = { + maxNoOfMessageInRequest: Int = 1000, + rateMultiplierFactor: Double = 1.0): ReceiverInputDStream[SparkPubsubMessage] = { ssc.withNamedScope("pubsub stream") { new PubsubInputDStream( @@ -63,6 +64,7 @@ object PubsubUtils { storageLevel, autoAcknowledge, maxNoOfMessageInRequest, + rateMultiplierFactor, ssc.conf ) } diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala index 2cc0f78d..64b3f0e7 100644 --- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala @@ -35,7 +35,7 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with Be val batchDuration = Seconds(1) - val blockSize = 10 + val blockSize = 15 private val master: String = "local[2]" @@ -79,6 +79,7 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with Be conf.set("spark.streaming.receiver.maxRate", "100") conf.set("spark.streaming.backpressure.pid.minRate", "10") conf.set("spark.streaming.blockQueueSize", blockSize.toString) + conf.set("spark.streaming.blockInterval", "1000ms") } From 9f81a1cb448b8fcf8c31c449f5c74090bff459f9 Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Fri, 28 Jan 2022 11:21:17 +0530 Subject: [PATCH 5/5] Now we can pull data from pubsub regional endpoint also --- .../streaming/pubsub/PubsubInputDStream.scala | 14 ++++++++++---- .../spark/streaming/pubsub/PubsubUtils.scala | 7 ++++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala index f8c898ce..18c6b2b6 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala @@ -32,6 +32,7 @@ import com.google.cloud.hadoop.util.RetryHttpInitializer import com.google.common.util.concurrent.RateLimiter import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream @@ -56,13 +57,14 @@ class PubsubInputDStream( val autoAcknowledge: Boolean, val maxNoOfMessageInRequest: Int, val rateMultiplierFactor: Double, + val endpoint: String, conf: SparkConf ) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) { override def getReceiver(): Receiver[SparkPubsubMessage] = { new PubsubReceiver( project, topic, subscription, credential, _storageLevel, autoAcknowledge, - maxNoOfMessageInRequest, rateMultiplierFactor, conf + maxNoOfMessageInRequest, rateMultiplierFactor, endpoint, conf ) } } @@ -248,6 +250,7 @@ object ConnectionUtils { * @param rateMultiplierFactor Increase the proposed rate estimated by PIDEstimator to take the * advantage of dynamic allocation of executor. * Default should be 1 if dynamic allocation is not enabled + * @param endpoint Pubsub service endpoint * @param conf Spark config */ private[pubsub] @@ -260,8 +263,9 @@ class PubsubReceiver( autoAcknowledge: Boolean, maxNoOfMessageInRequest: Int, rateMultiplierFactor: Double, + endpoint: String, conf: SparkConf) - extends Receiver[SparkPubsubMessage](storageLevel) { + extends Receiver[SparkPubsubMessage](storageLevel) with Logging { val APP_NAME = "sparkstreaming-pubsub-receiver" @@ -285,8 +289,9 @@ class PubsubReceiver( ConnectionUtils.transport, ConnectionUtils.jacksonFactory, new RetryHttpInitializer(credential.provider, APP_NAME)) - .setApplicationName(APP_NAME) - .build() + .setApplicationName(APP_NAME) + .setRootUrl(endpoint) + .build() val projectFullName: String = s"projects/$project" val subscriptionFullName: String = s"$projectFullName/subscriptions/$subscription" @@ -374,6 +379,7 @@ class PubsubReceiver( val newRateLimit = rateMultiplierFactor * supervisor.getCurrentRateLimit.min(maxRateLimit) if (rateLimiter.getRate != newRateLimit) { rateLimiter.setRate(newRateLimit) + logInfo("New rateLimit:: " + newRateLimit) } } diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala index 909715f2..c87cbcd9 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala @@ -17,12 +17,15 @@ package org.apache.spark.streaming.pubsub +import com.google.api.services.pubsub.Pubsub + import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.JavaReceiverInputDStream import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream + object PubsubUtils { /** @@ -52,7 +55,8 @@ object PubsubUtils { storageLevel: StorageLevel, autoAcknowledge: Boolean = true, maxNoOfMessageInRequest: Int = 1000, - rateMultiplierFactor: Double = 1.0): ReceiverInputDStream[SparkPubsubMessage] = { + rateMultiplierFactor: Double = 1.0, + endpoint: String = Pubsub.DEFAULT_ROOT_URL): ReceiverInputDStream[SparkPubsubMessage] = { ssc.withNamedScope("pubsub stream") { new PubsubInputDStream( @@ -65,6 +69,7 @@ object PubsubUtils { autoAcknowledge, maxNoOfMessageInRequest, rateMultiplierFactor, + endpoint, ssc.conf ) }