Skip to content

Conversation

@KevBrowne
Copy link
Contributor

@KevBrowne KevBrowne commented Nov 7, 2025

Summary

Implementing configurable rate limiting and explicit source parallelism control for PscDynamicTableSource to provide better control over record consumption and operator parallelism.

Motivation

  1. Rate Limiting - Throttle record consumption at the source level to:

    • Mitigate large traffic flows that can cause Flink job instability
    • Prevent downstream operators from being overwhelmed
    • Enable gradual traffic ramp-up scenarios
  2. Explicit Source Parallelism Control - Allow users to configure scan.parallelism to:

    • Override default partition-based parallelism
    • Enable rescale operator when desired parallelism exceeds partition count
    • Ensure correct rate limiting behavior with explicit parallelism configuration

Changes

New Files

  • PscRateLimitMap.java - RichMapFunction that applies per-record throttling using Guava's RateLimiter. Automatically divides the configured global rate across parallel subtasks. Emits metrics for monitoring:

    • throttleOccurred - Counter for number of throttle events
    • maxThrottleDelayMs - Gauge tracking maximum delay observed since job start
    • currentThrottleDelayMs - Gauge tracking real-time throttle delay
  • PscRateLimitMapTest.java - Unit tests validating rate limiter creation, rate division by parallelism, minimum rate validation, and edge cases

  • PscTableCommonUtilsTest.java - Comprehensive unit tests for shouldApplyRescale() decision logic covering 16 scenarios with mocked partition counts

Changes in Existing Files

Configuration:

  • PscConnectorOptions.java
    • Added scan.rate-limit.records-per-second (double, optional) - Global rate limit for all subtasks
    • Added scan.parallelism (integer, optional) - Explicit source operator parallelism

Rescale Decision Logic:

  • PscTableCommonUtils.java
    • Updated shouldApplyRescale() to prioritize scan.parallelism over global default parallelism
    • Added PartitionCountProvider interface for dependency injection in tests
    • Added setProviderForTest() and resetProvider() methods with @VisibleForTesting annotation
    • Replaced hard-coded configuration strings with ConfigOption.key() references

Core Implementation:

  • PscDynamicSource.java

    • Added scanParallelism field and updated all constructors
    • Critical fix: Corrected operator ordering to apply rescale() BEFORE rate limiting
    • Source parallelism defaults to partition count (Flink's natural behavior for Kafka-like sources)
    • Rate limiter parallelism calculation:
      • If rescale enabled: use intended parallelism (scan.parallelism or global default)
      • If rescale disabled: use source parallelism (partition count)
    • Added isRateLimitingEnabled() utility method for reusable condition checking
    • Added getIntendedParallelism() helper method to encapsulate parallelism logic
    • Updated copy(), equals(), and hashCode() to include new fields
  • PscDynamicTableFactory.java

    • Reads scan.parallelism and scan.rate-limit.records-per-second configurations
    • Passes scanParallelism to shouldApplyRescale() and source constructor
    • Added logging for scan parallelism and rate limiting states
    • Uses isRateLimitingEnabled() for consistent condition checks
  • UpsertPscDynamicTableFactory.java

    • Same updates as PscDynamicTableFactory for upsert sources

Testing:

  • PscDynamicTableFactoryTest.java
    • Updated createExpectedScanSource() to include scanParallelism parameter
    • Added helper method produceTransformationFromSource() to reduce repetitive test code
    • Added helper methods addRescaleConfig(), addRateLimitConfig(), addScanParallelismConfig() using ConfigOption.key()
    • Added @AfterEach hook to reset partition count provider after each test
    • Added three new tests with mocked partition counts:
      • testRescaleCreatesPartitionTransformation() - Verifies PartitionTransformation creation
      • testRescaleAndRateLimitChain() - Verifies complete operator chain
      • testSkipsRescaleWhenNotNeeded() - Verifies rescale is skipped when not needed
      • testRescaleAndRateLimitWithDifferentParallelism() - Verifies correct parallelism when scan.parallelism differs from global default
    • Existing tests continue to validate default behavior, rate limit configurations, and edge cases

Implementation Details

The rate limiter is applied as a map transformation after the source operator:

  1. User configures total rate limit (e.g., 1000 records/sec)
  2. At runtime, PscRateLimitMap.open() divides the rate by parallelism
  3. Each subtask gets an independent RateLimiter instance (e.g., 250 records/sec with parallelism of 4)
  4. Records are throttled using tryAcquire() + acquire() pattern
  5. Metrics track throttle occurrences and maximum delay

Rate limiting is disabled by default. Users must explicitly enable it.

Tests:

  • PscDynamicTableFactoryTest.java
    • Updated constructor calls to include scanParallelism parameter
    • Existing tests validate rate limit disabled (default), enabled, combined with rescale, and fractional rates

Implementation Details

Operator Ordering (Critical Fix)

The implementation ensures correct rate limiting by applying operators in this order:

  1. Source operator - Parallelism naturally determined by partition count (e.g., 16 partitions = 16 subtasks, all active)
  2. Rescale operator (if enabled) - Redistributes data from source parallelism to intended parallelism (e.g., 16 → 40 subtasks)
  3. Rate limiter (if enabled) - Applied AFTER rescale with correct parallelism

Usage Examples

Basic Rate Limiting

CREATE TABLE my_source (
  id BIGINT,
  name STRING,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'psc',
  'topic-uri' = 'kafka:kafka-cluster:topic-name',
  'scan.rate-limit.records-per-second' = '5000',
  'value.format' = 'json'
);

With 10 topic partitions, each source subtask processes at 500 records/sec.

Rate Limiting with Explicit Source Parallelism

CREATE TABLE my_source (
  id BIGINT,
  name STRING,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'psc',
  'topic-uri' = 'kafka:kafka-cluster:topic-name',
  'scan.parallelism' = '100',                      -- Explicit source parallelism
  'scan.enable-rescale' = 'true',                  -- Enable rescale
  'scan.rate-limit.records-per-second' = '50000',  -- Rate limit after rescale
  'value.format' = 'json'
);

Behavior:

  • Assuming 16 topic partitions
  • Source operator: 16 subtasks (one per partition)
  • Rescale operator: redistributes to 100 subtasks
  • Rate limiter: 100 subtasks, each processing at 500 records/sec

Rate Limiting Without Rescale

CREATE TABLE my_source (
  id BIGINT,
  name STRING,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'psc',
  'topic-uri' = 'kafka:kafka-cluster:topic-name',
  'scan.enable-rescale' = 'false',                 -- Disable rescale
  'scan.rate-limit.records-per-second' = '10000',
  'value.format' = 'json'
);

Behavior:

  • Source and rate limiter stay at partition count parallelism
  • No data redistribution

Test Coverage

Unit Tests - PscRateLimitMapTest.java

  • Rate limiter creation with valid configuration
  • Per-subtask rate division by parallelism
  • Minimum rate validation (fails if subtask rate < 0.1 QPS)
  • Edge cases: null rates, negative rates, zero parallelism

Unit Tests - PscTableCommonUtilsTest.java (16 test cases)

Scenario Test Method Result
Rescale disabled testShouldNotRescaleWhenDisabled() No rescale
scan.parallelism > partitions testShouldRescaleWhenScanParallelismExceedsPartitionCount() Rescale applied
scan.parallelism < partitions testShouldNotRescaleWhenScanParallelismLessThanPartitionCount() No rescale
scan.parallelism == partitions testShouldNotRescaleWhenScanParallelismEqualsPartitionCount() No rescale
global parallelism > partitions testShouldRescaleWhenGlobalParallelismExceedsPartitionCount() Rescale applied
global parallelism < partitions testShouldNotRescaleWhenGlobalParallelismLessThanPartitionCount() No rescale
scan.parallelism = 0 (invalid) testShouldNotRescaleWhenScanParallelismIsZero() No rescale
scan.parallelism = -1 (invalid) testShouldNotRescaleWhenScanParallelismIsNegative() No rescale
No parallelism configured testShouldNotRescaleWhenNoParallelismConfigured() No rescale
partition count = -1 testShouldNotRescaleWhenPartitionCountCannotBeDetermined() No rescale
partition count = 0 testShouldNotRescaleWhenPartitionCountIsZero() No rescale
scan.parallelism takes precedence testScanParallelismTakesPrecedenceOverGlobalParallelism() Uses scan.parallelism
Multi-topic scenarios testShouldRescaleWithMultipleTopics() Uses minimum partition count
High parallelism (2000 > 10) testShouldRescaleWithHighParallelismAndLowPartitionCount() Rescale applied
High partitions (2 < 1000) testShouldNotRescaleWithHighPartitionCountAndLowParallelism() No rescale
Provider reset mechanism testProviderResetRestoresDefaultBehavior() Provider restored

psc % mvn -pl psc-flink test -Dtest=PscTableCommonUtilsTest,PscDynamicTableFactoryTest,PscRateLimitMapTest -Dgpg.skip=true -Djacoco.skip=true
WARNING: package sun.misc not in java.base
WARNING: A terminally deprecated method in sun.misc.Unsafe has been called
WARNING: sun.misc.Unsafe::staticFieldBase has been called by com.google.inject.internal.aop.HiddenClassDefiner (file:/opt/homebrew/Cellar/maven/3.9.11/libexec/lib/guice-5.1.0-classes.jar)
WARNING: Please consider reporting this to the maintainers of class com.google.inject.internal.aop.HiddenClassDefiner
WARNING: sun.misc.Unsafe::staticFieldBase will be removed in a future release
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest.psc:psc-examples:jar:4.1.3-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-source-plugin is missing. @ com.pinterest.psc:psc-java-oss:4.1.3-SNAPSHOT, /Users/kbrowne/code/psc/pom.xml, line 155, column 21
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest.psc:psc-integration-test:jar:4.1.3-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-source-plugin is missing. @ com.pinterest.psc:psc-java-oss:4.1.3-SNAPSHOT, /Users/kbrowne/code/psc/pom.xml, line 155, column 21
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest.psc:psc-flink:jar:4.1.3-SNAPSHOT
[WARNING] 'dependencies.dependency.(groupId:artifactId:type:classifier)' must be unique: org.apache.flink:flink-table-planner_${scala.binary.version}:jar -> duplicate declaration of version ${flink.version} @ line 327, column 21
[WARNING] 'dependencies.dependency.(groupId:artifactId:type:classifier)' must be unique: org.apache.flink:flink-json:jar -> duplicate declaration of version ${flink.version} @ line 417, column 21
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest.psc:psc-logging:jar:4.1.3-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-source-plugin is missing. @ com.pinterest.psc:psc-java-oss:4.1.3-SNAPSHOT, /Users/kbrowne/code/psc/pom.xml, line 155, column 21
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest.psc:psc-common:jar:4.1.3-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-source-plugin is missing. @ com.pinterest.psc:psc-java-oss:4.1.3-SNAPSHOT, /Users/kbrowne/code/psc/pom.xml, line 155, column 21
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest.psc:psc-flink-logging:jar:4.1.3-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-source-plugin is missing. @ com.pinterest.psc:psc-java-oss:4.1.3-SNAPSHOT, /Users/kbrowne/code/psc/pom.xml, line 155, column 21
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest.psc:psc-java-oss:pom:4.1.3-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-source-plugin is missing. @ line 155, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO] Inspecting build with total of 1 modules...
[INFO] Installing Nexus Staging features:
[INFO] ... total of 1 executions of maven-deploy-plugin replaced with nexus-staging-maven-plugin
[INFO]
[INFO] --------------------< com.pinterest.psc:psc-flink >---------------------
[INFO] Building psc-flink 4.1.3-SNAPSHOT
[INFO] from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[WARNING] 1 problem was encountered while building the effective model for org.javassist:javassist:jar:3.18.2-GA during dependency collection step for project (use -X to see details)
[WARNING] 2 problems were encountered while building the effective model for org.apache.yetus:audience-annotations:jar:0.5.0 during dependency collection step for project (use -X to see details)
[WARNING] 1 problem was encountered while building the effective model for org.javassist:javassist:jar:3.18.1-GA during dependency collection step for project (use -X to see details)
[INFO]
[INFO] --- jacoco:0.8.5:prepare-agent (prepare-unit-tests) @ psc-flink ---
[INFO] Skipping JaCoCo execution because property jacoco.skip is set.
[INFO] argLine set to empty
[INFO]
[INFO] --- resources:3.3.1:resources (default-resources) @ psc-flink ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource from src/main/resources to target/classes
[INFO]
[INFO] --- compiler:3.8.1:compile (default-compile) @ psc-flink ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- resources:3.3.1:testResources (default-testResources) @ psc-flink ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 99 resources from src/test/resources to target/test-classes
[INFO]
[INFO] --- compiler:3.8.1:testCompile (default-testCompile) @ psc-flink ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- surefire:3.0.0-M5:test (default-test) @ psc-flink ---
[INFO]
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.pinterest.flink.streaming.connectors.psc.table.PscRateLimitMapTest
[INFO] Tests run: 9, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.318 s - in com.pinterest.flink.streaming.connectors.psc.table.PscRateLimitMapTest
[INFO] Running com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtilsTest
[INFO] Tests run: 16, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.19 s - in com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtilsTest
[INFO] Running com.pinterest.flink.streaming.connectors.psc.table.PscDynamicTableFactoryTest
[INFO] Tests run: 70, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.401 s - in com.pinterest.flink.streaming.connectors.psc.table.PscDynamicTableFactoryTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 95, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.665 s
[INFO] Finished at: 2025-11-24T13:28:44-05:00
[INFO] ------------------------------------------------------------------------

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left a few comments. PTAL.

@KevBrowne KevBrowne requested a review from nickpan47 November 13, 2025 17:06
@KevBrowne KevBrowne changed the title adding rate limiter adding PSC-Flink Source rate limiter Nov 13, 2025
Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few discussions to the PR. PTAL. Thanks!

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor. The logic is much cleaner now. Overall lgtm. PTAL of the minor comments.

@KevBrowne KevBrowne requested a review from nickpan47 November 18, 2025 18:17
Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall lgtm. Some minor suggestions to improve unit tests. Thanks!

@KevBrowne KevBrowne requested a review from nickpan47 November 21, 2025 02:02
@KevBrowne KevBrowne marked this pull request as ready for review November 21, 2025 02:02
@KevBrowne KevBrowne requested a review from a team as a code owner November 21, 2025 02:02
Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall good. One suggestion to make sure we cover the different conditions w.r.t. topic partition vs scan.parallelism w/ mocks.

@KevBrowne KevBrowne requested a review from nickpan47 November 24, 2025 18:36
Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final one nit on usage on WhiteBox. Otherwise, lgtm. Thanks!

@KevBrowne KevBrowne requested a review from nickpan47 November 25, 2025 03:38
Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. Thanks for the revisions!

@jeffxiang jeffxiang merged commit bce3b40 into pinterest:main Nov 25, 2025
2 checks passed
jeffxiang pushed a commit that referenced this pull request Dec 8, 2025
* adding rate limiter

* fix scale rate limit ordering

* introduce parallelism scan by scoure

* fix readability

* code cleanup + add tests

* fix tests

* push partition class

* refactor unit tests

* adding assertions + renaming

* remove whitebox in favor of  setProviderTest

* using setProvider test

---------

Co-authored-by: Kevin Browne <kbrowne@kbrowne-JG645XV.dyn.pinadmin.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants