From 9dcc892ae5baef1512db6226b9631546209aeff8 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Tue, 21 Oct 2025 16:20:20 -0400 Subject: [PATCH 1/9] Adding configurable optional to set source parallelism --- .../psc/table/PscConnectorOptions.java | 2 + .../psc/table/PscDynamicSource.java | 25 ++++++++-- .../psc/table/PscDynamicTableFactory.java | 13 +++-- .../table/UpsertPscDynamicTableFactory.java | 7 ++- .../psc/table/PscDynamicTableFactoryTest.java | 50 ++++++++++++++++++- 5 files changed, 88 insertions(+), 9 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index 21338b0d..f39f21f8 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -109,6 +109,8 @@ public class PscConnectorOptions { public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + public static final ConfigOption SCAN_PARALLELISM = FactoryUtil.SCAN_PARALLELISM; + // -------------------------------------------------------------------------------------------- // Psc specific options // -------------------------------------------------------------------------------------------- diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index 2ca65613..c2905079 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -169,6 +169,9 @@ public class PscDynamicSource /** Optional user-provided UID prefix for stabilizing operator UIDs across DAG changes. */ protected final @Nullable String sourceUidPrefix; + /** Optional parallelism for the source operator. If not set, uses Flink's default. */ + protected final @Nullable Integer sourceParallelism; + public PscDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -187,7 +190,8 @@ public PscDynamicSource( long boundedTimestampMillis, boolean upsertMode, String tableIdentifier, - @Nullable String sourceUidPrefix) { + @Nullable String sourceUidPrefix, + @Nullable Integer sourceParallelism) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -228,11 +232,12 @@ public PscDynamicSource( this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; this.sourceUidPrefix = sourceUidPrefix; + this.sourceParallelism = sourceParallelism; } /** - * Backward-compatible constructor without UID prefix. Delegates to the full constructor with a - * null {@code sourceUidPrefix}. + * Backward-compatible constructor without UID prefix and parallelism. Delegates to the full + * constructor with null {@code sourceUidPrefix} and {@code sourceParallelism}. */ public PscDynamicSource( DataType physicalDataType, @@ -270,6 +275,7 @@ public PscDynamicSource( boundedTimestampMillis, upsertMode, tableIdentifier, + null, null); } @@ -302,6 +308,12 @@ public DataStream produceDataStream( DataStreamSource sourceStream = execEnv.fromSource( pscSource, watermarkStrategy, "PscSource-" + tableIdentifier); + + // Apply explicit parallelism if configured + if (sourceParallelism != null && sourceParallelism > 0) { + sourceStream.setParallelism(sourceParallelism); + } + // Prefer explicit user-provided UID prefix if present; otherwise rely on provider context. if (sourceUidPrefix != null) { final String trimmedPrefix = sourceUidPrefix.trim(); @@ -396,7 +408,8 @@ public DynamicTableSource copy() { boundedTimestampMillis, upsertMode, tableIdentifier, - sourceUidPrefix); + sourceUidPrefix, + sourceParallelism); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -436,6 +449,8 @@ public boolean equals(Object o) { && boundedTimestampMillis == that.boundedTimestampMillis && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) + && Objects.equals(sourceUidPrefix, that.sourceUidPrefix) + && Objects.equals(sourceParallelism, that.sourceParallelism) && Objects.equals(watermarkStrategy, that.watermarkStrategy); } @@ -461,6 +476,8 @@ public int hashCode() { boundedTimestampMillis, upsertMode, tableIdentifier, + sourceUidPrefix, + sourceParallelism, watermarkStrategy); } diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index 194cd629..ebac093d 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -75,6 +75,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_PARALLELISM; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI; @@ -149,6 +150,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(SCAN_PARALLELISM); options.add(SOURCE_UID_PREFIX); return options; } @@ -212,6 +214,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + return createPscTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -229,7 +233,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString(), - tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null)); + tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), + scanParallelism); } @Override @@ -395,7 +400,8 @@ protected PscDynamicSource createPscTableSource( Map specificEndOffsets, long endTimestampMillis, String tableIdentifier, - @Nullable String sourceUidPrefix) { + @Nullable String sourceUidPrefix, + @Nullable Integer scanParallelism) { return new PscDynamicSource( physicalDataType, keyDecodingFormat, @@ -414,7 +420,8 @@ protected PscDynamicSource createPscTableSource( endTimestampMillis, false, tableIdentifier, - sourceUidPrefix); + sourceUidPrefix, + scanParallelism); } protected PscDynamicSink creatPscTableSink( diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index 3e599897..93d5750b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -61,6 +61,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_MODE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_PARALLELISM; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; @@ -112,6 +113,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(SCAN_PARALLELISM); options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); options.add(SOURCE_UID_PREFIX); @@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + return new PscDynamicSource( context.getPhysicalRowDataType(), keyDecodingFormat, @@ -168,7 +172,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedTimestampMillis, true, context.getObjectIdentifier().asSummaryString(), - tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null)); + tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), + scanParallelism); } @Override diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index 3d253b6e..348cd3cf 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -1246,6 +1246,52 @@ public void testTableSinkWithParallelism() { assertThat((long) sinkProvider.getParallelism().get()).isEqualTo(100); } + @Test + public void testTableSourceWithParallelism() { + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), options -> options.put("scan.parallelism", "10")); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + + // Verify the parallelism is stored in the source + assertThat(pscSource.sourceParallelism).isEqualTo(10); + + // Verify that the source equals the expected source with parallelism set + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0); + specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_1), OFFSET_1); + + final DecodingFormat> valueDecodingFormat = + new DecodingFormatMock(",", true); + + final PscDynamicSource expectedPscSource = + new PscDynamicSource( + SCHEMA_DATA_TYPE, + null, + valueDecodingFormat, + new int[0], + new int[] {0, 1, 2}, + null, + Collections.singletonList(TOPIC_URI), + null, + PSC_SOURCE_PROPERTIES, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets, + 0, + BoundedMode.UNBOUNDED, + Collections.emptyMap(), + 0, + false, + FactoryMocks.IDENTIFIER.asSummaryString(), + null, + 10); + assertThat(pscSource).isEqualTo(expectedPscSource); + } + @Test public void testTableSinkAutoCompleteSchemaRegistrySubject() { // only format @@ -1729,7 +1775,9 @@ private static PscDynamicSource createExpectedScanSource( Collections.emptyMap(), 0, false, - FactoryMocks.IDENTIFIER.asSummaryString()); + FactoryMocks.IDENTIFIER.asSummaryString(), + null, + null); } private static PscDynamicSink createExpectedSink( From 051e4372bd8c6b543fc170bc477e4051b2a752f0 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Tue, 21 Oct 2025 16:37:29 -0400 Subject: [PATCH 2/9] fixes --- .../connectors/psc/table/PscConnectorOptions.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index f39f21f8..45f46dc2 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -109,7 +109,15 @@ public class PscConnectorOptions { public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; - public static final ConfigOption SCAN_PARALLELISM = FactoryUtil.SCAN_PARALLELISM; + public static final ConfigOption SCAN_PARALLELISM = + ConfigOptions.key("scan.parallelism") + .intType() + .noDefaultValue() + .withDescription( + "Defines a custom parallelism for the scan source. " + + "By default, if this option is not defined, the planner will derive the parallelism " + + "based on the number of partitions. When this option is set, the specified " + + "parallelism will be used and partitions will be distributed across all readers."); // -------------------------------------------------------------------------------------------- // Psc specific options From 16708a998fc83db7e187429bf4e010d953b0e341 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Wed, 22 Oct 2025 19:26:15 -0400 Subject: [PATCH 3/9] make source infer parallelism --- psc-flink/pom.xml | 2 +- .../psc/table/PscConnectorOptions.java | 17 ++ .../psc/table/PscDynamicTableFactory.java | 39 +++- .../psc/table/PscTableSourceUtil.java | 174 ++++++++++++++++++ .../table/UpsertPscDynamicTableFactory.java | 39 +++- .../psc/table/PscDynamicTableFactoryTest.java | 61 ++++++ .../psc/table/PscTableSourceUtilTest.java | 121 ++++++++++++ 7 files changed, 450 insertions(+), 3 deletions(-) create mode 100644 psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java create mode 100644 psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml index f09e9bf1..c3f171af 100644 --- a/psc-flink/pom.xml +++ b/psc-flink/pom.xml @@ -459,7 +459,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.9.1 + 3.2.0 ${javadoc.opts} diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index 45f46dc2..f3f41550 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -119,6 +119,23 @@ public class PscConnectorOptions { + "based on the number of partitions. When this option is set, the specified " + "parallelism will be used and partitions will be distributed across all readers."); + public static final ConfigOption INFER_SCAN_PARALLELISM = + ConfigOptions.key("scan.parallelism.infer") + .booleanType() + .defaultValue(false) + .withDescription( + "If false, parallelism is set by 'scan.parallelism' or Flink's default.\n" + + "If true, source parallelism is inferred from topic partition count. " + + "This option is ignored if 'scan.parallelism' is explicitly set."); + + public static final ConfigOption INFER_SCAN_PARALLELISM_MAX = + ConfigOptions.key("scan.parallelism.infer.max") + .intType() + .defaultValue(128) + .withDescription( + "Maximum inferred parallelism for scan operator when 'scan.parallelism.infer' is enabled. " + + "The actual parallelism will be min(partition_count, this_value)."); + // -------------------------------------------------------------------------------------------- // Psc specific options // -------------------------------------------------------------------------------------------- diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index ebac093d..be7eddaf 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -76,6 +76,8 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_PARALLELISM; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI; @@ -151,6 +153,8 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); options.add(SCAN_PARALLELISM); + options.add(INFER_SCAN_PARALLELISM); + options.add(INFER_SCAN_PARALLELISM_MAX); options.add(SOURCE_UID_PREFIX); return options; } @@ -214,7 +218,28 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); - final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + // Determine scan parallelism with priority: + // 1. Explicit scan.parallelism (if set) + // 2. Inferred parallelism (if inference enabled) + // 3. null (Flink default) + Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + + if (scanParallelism == null && tableOptions.get(INFER_SCAN_PARALLELISM)) { + // Use lazy supplier to avoid metadata calls unless needed + final List topicUris = getSourceTopicUris(tableOptions); + final Properties props = properties; + scanParallelism = PscTableSourceUtil.inferParallelism( + tableOptions, + () -> { + if (topicUris == null || topicUris.isEmpty()) { + return 1; + } + // Extract cluster URI from first topic URI for metadata client + String firstTopicUri = topicUris.get(0); + String clusterUri = extractClusterUri(firstTopicUri); + return PscTableSourceUtil.getPartitionCount(props, topicUris, clusterUri); + }); + } return createPscTableSource( physicalDataType, @@ -454,4 +479,16 @@ protected PscDynamicSink creatPscTableSink( parallelism, transactionalIdPrefix); } + + /** + * Extract cluster URI from a full topic URI. + * For example: "kafka://cluster-name/topic-name" -> "kafka://cluster-name" + */ + private static String extractClusterUri(String topicUri) { + int lastSlash = topicUri.lastIndexOf('/'); + if (lastSlash > 0) { + return topicUri.substring(0, lastSlash); + } + return topicUri; + } } diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java new file mode 100644 index 00000000..4fb7c708 --- /dev/null +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pinterest.flink.streaming.connectors.psc.table; + +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfigurationUtils; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; +import com.pinterest.psc.metadata.TopicUriMetadata; +import com.pinterest.psc.metadata.client.PscMetadataClient; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.function.Supplier; + +/** + * Utility class for PSC table source operations, particularly parallelism inference. + * Inspired by Iceberg's SourceUtil pattern. + */ +public class PscTableSourceUtil { + + private static final Logger LOG = LoggerFactory.getLogger(PscTableSourceUtil.class); + private static final Duration METADATA_TIMEOUT = Duration.ofSeconds(30); + + private PscTableSourceUtil() { + // Utility class, no instantiation + } + + /** + * Infer source parallelism based on topic partition count, similar to Iceberg's approach. + * + *

The inference logic follows this priority: + *

    + *
  1. Start with Flink's global default parallelism from {@link ExecutionConfigOptions#TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM}
  2. + *
  3. If {@link PscConnectorOptions#INFER_SCAN_PARALLELISM} is enabled, use partition count
  4. + *
  5. Cap the result at {@link PscConnectorOptions#INFER_SCAN_PARALLELISM_MAX}
  6. + *
  7. Ensure result is at least 1
  8. + *
+ * + * @param readableConfig Flink configuration + * @param partitionCountProvider Lazy supplier of partition count (expensive metadata operation) + * @return Inferred parallelism value + */ + public static int inferParallelism( + ReadableConfig readableConfig, + Supplier partitionCountProvider) { + + // 1. Start with Flink's global default parallelism + int parallelism = readableConfig.get( + ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + + LOG.debug("Starting parallelism inference with Flink default: {}", parallelism); + + // 2. Check if inference is enabled + if (readableConfig.get(PscConnectorOptions.INFER_SCAN_PARALLELISM)) { + int maxInferParallelism = readableConfig.get( + PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX); + + // Get actual partition count (lazy evaluation to avoid unnecessary metadata calls) + int partitionCount = partitionCountProvider.get(); + LOG.debug("Partition count from provider: {}", partitionCount); + + // Use minimum of partition count and max cap + parallelism = Math.min(partitionCount, maxInferParallelism); + LOG.info("Inferred parallelism from {} partitions: {} (max cap: {})", + partitionCount, parallelism, maxInferParallelism); + } else { + LOG.debug("Parallelism inference is disabled, using default: {}", parallelism); + } + + // 3. Ensure parallelism is at least 1 + parallelism = Math.max(1, parallelism); + LOG.debug("Final inferred parallelism: {}", parallelism); + + return parallelism; + } + + /** + * Get partition count for given topic URIs by querying PSC metadata. + * + *

This method creates a temporary {@link PscMetadataClient} to query topic metadata + * and counts the total number of partitions across all specified topics. + * + * @param properties PSC properties with cluster configuration + * @param topicUriStrings List of topic URI strings to query + * @param clusterUriString Cluster URI string for metadata client + * @return Total partition count across all topics + * @throws RuntimeException if metadata query fails + */ + public static int getPartitionCount( + Properties properties, + List topicUriStrings, + String clusterUriString) { + + LOG.debug("Fetching partition count for {} topics from cluster {}", + topicUriStrings.size(), clusterUriString); + + if (topicUriStrings == null || topicUriStrings.isEmpty()) { + LOG.warn("No topics specified, returning partition count of 0"); + return 0; + } + + Properties metadataProps = new Properties(); + metadataProps.putAll(properties); + + // Create temporary metadata client + try (PscMetadataClient metadataClient = new PscMetadataClient( + PscConfigurationUtils.propertiesToPscConfiguration(metadataProps))) { + + // Parse cluster URI + TopicUri baseClusterUri = TopicUri.validate(clusterUriString); + + // Parse topic URIs + Set topicUriSet = new HashSet<>(); + for (String topicUriStr : topicUriStrings) { + try { + topicUriSet.add(TopicUri.validate(topicUriStr)); + } catch (TopicUriSyntaxException e) { + LOG.warn("Failed to parse topic URI: {}, skipping", topicUriStr, e); + } + } + + if (topicUriSet.isEmpty()) { + LOG.warn("No valid topic URIs after parsing, returning partition count of 0"); + return 0; + } + + // Query metadata for all topics + Map metadata = metadataClient.describeTopicUris( + baseClusterUri, + topicUriSet, + METADATA_TIMEOUT); + + // Sum up partition counts across all topics + int totalPartitions = metadata.values().stream() + .mapToInt(m -> m.getTopicUriPartitions().size()) + .sum(); + + LOG.info("Total partitions across {} topics: {}", topicUriSet.size(), totalPartitions); + return totalPartitions; + + } catch (ConfigurationException e) { + throw new RuntimeException("Failed to create PscMetadataClient for partition count query", e); + } catch (TopicUriSyntaxException e) { + throw new RuntimeException("Failed to parse cluster URI: " + clusterUriString, e); + } catch (Exception e) { + throw new RuntimeException("Failed to fetch partition count for parallelism inference", e); + } + } +} + diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index 93d5750b..c13e6269 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -62,6 +62,8 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_PARALLELISM; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; @@ -114,6 +116,8 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); options.add(SCAN_PARALLELISM); + options.add(INFER_SCAN_PARALLELISM); + options.add(INFER_SCAN_PARALLELISM_MAX); options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); options.add(SOURCE_UID_PREFIX); @@ -152,7 +156,28 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); - final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + // Determine scan parallelism with priority: + // 1. Explicit scan.parallelism (if set) + // 2. Inferred parallelism (if inference enabled) + // 3. null (Flink default) + Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + + if (scanParallelism == null && tableOptions.get(INFER_SCAN_PARALLELISM)) { + // Use lazy supplier to avoid metadata calls unless needed + final List topicUris = getSourceTopicUris(tableOptions); + final Properties props = properties; + scanParallelism = PscTableSourceUtil.inferParallelism( + tableOptions, + () -> { + if (topicUris == null || topicUris.isEmpty()) { + return 1; + } + // Extract cluster URI from first topic URI for metadata client + String firstTopicUri = topicUris.get(0); + String clusterUri = extractClusterUri(firstTopicUri); + return PscTableSourceUtil.getPartitionCount(props, topicUris, clusterUri); + }); + } return new PscDynamicSource( context.getPhysicalRowDataType(), @@ -433,4 +458,16 @@ public int hashCode() { return Objects.hash(innerEncodingFormat); } } + + /** + * Extract cluster URI from a full topic URI. + * For example: "kafka://cluster-name/topic-name" -> "kafka://cluster-name" + */ + private static String extractClusterUri(String topicUri) { + int lastSlash = topicUri.lastIndexOf('/'); + if (lastSlash > 0) { + return topicUri.substring(0, lastSlash); + } + return topicUri; + } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index 348cd3cf..33aceec3 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -1292,6 +1292,67 @@ public void testTableSourceWithParallelism() { assertThat(pscSource).isEqualTo(expectedPscSource); } + @Test + public void testTableSourceWithInferredParallelismDisabledByDefault() { + // Test that inference is disabled by default (backward compatibility) + final Map modifiedOptions = getBasicSourceOptions(); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + + // Without explicit parallelism or inference enabled, should be null + assertThat(pscSource.sourceParallelism).isNull(); + } + + @Test + public void testTableSourceExplicitParallelismOverridesInference() { + // Test that explicit parallelism takes priority over inference + // Even with inference enabled, explicit value should be used and no metadata calls made + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + options.put("scan.parallelism", "20"); + options.put("scan.parallelism.infer", "true"); + options.put("scan.parallelism.infer.max", "50"); + }); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + + // Verify that explicit parallelism is used, not inferred + assertThat(pscSource.sourceParallelism).isEqualTo(20); + } + + @Test + public void testTableSourceInferenceConfigOptionsAccepted() { + // Test that inference config options are recognized (doesn't validate inference logic) + // This test ensures the options are registered but doesn't trigger actual metadata calls + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + // Use explicit parallelism to avoid metadata calls in unit test + options.put("scan.parallelism", "15"); + // But also set inference options to verify they're accepted + options.put("scan.parallelism.infer", "false"); + options.put("scan.parallelism.infer.max", "100"); + }); + + // Should not throw validation exception + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + + // Explicit parallelism should be used + assertThat(pscSource.sourceParallelism).isEqualTo(15); + } + @Test public void testTableSinkAutoCompleteSchemaRegistrySubject() { // only format diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java new file mode 100644 index 00000000..43b96090 --- /dev/null +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pinterest.flink.streaming.connectors.psc.table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link PscTableSourceUtil}. + */ +public class PscTableSourceUtilTest { + + @Test + public void testInferParallelismWithInferenceDisabled() { + Configuration config = new Configuration(); + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, false); + + // Supplier should not be called when inference is disabled + int parallelism = PscTableSourceUtil.inferParallelism(config, () -> { + throw new RuntimeException("Supplier should not be called"); + }); + + // Should use Flink's default parallelism + assertThat(parallelism).isEqualTo(16); + } + + @Test + public void testInferParallelismWithInferenceEnabled() { + Configuration config = new Configuration(); + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX, 50); + + // Supplier returns 30 partitions + int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 30); + + // Should use partition count since it's less than max + assertThat(parallelism).isEqualTo(30); + } + + @Test + public void testInferParallelismRespectsMaxCap() { + Configuration config = new Configuration(); + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX, 50); + + // Supplier returns 200 partitions (more than max) + int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 200); + + // Should cap at max value + assertThat(parallelism).isEqualTo(50); + } + + @Test + public void testInferParallelismWithZeroPartitions() { + Configuration config = new Configuration(); + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX, 50); + + // Supplier returns 0 partitions + int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 0); + + // Should ensure minimum of 1 + assertThat(parallelism).isEqualTo(1); + } + + @Test + public void testInferParallelismWithDefaultMax() { + Configuration config = new Configuration(); + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); + // Don't set max, use default (128) + + // Supplier returns 150 partitions + int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 150); + + // Should cap at default max value (128) + assertThat(parallelism).isEqualTo(128); + } + + @Test + public void testInferParallelismLazyEvaluation() { + Configuration config = new Configuration(); + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); + config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, false); + + boolean[] supplierCalled = {false}; + + // Supplier should NOT be called when inference is disabled + int parallelism = PscTableSourceUtil.inferParallelism(config, () -> { + supplierCalled[0] = true; + return 100; + }); + + assertThat(parallelism).isEqualTo(16); + assertThat(supplierCalled[0]).isFalse(); // Verify lazy evaluation + } +} + From a009168527db829c02075f9b9f7f64927f92291b Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Thu, 23 Oct 2025 11:04:39 -0400 Subject: [PATCH 4/9] adding shuffle --- .../connectors/psc/table/PscDynamicSource.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index c2905079..f9a9a3ac 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -310,8 +310,15 @@ public DataStream produceDataStream( pscSource, watermarkStrategy, "PscSource-" + tableIdentifier); // Apply explicit parallelism if configured + // Note: Setting parallelism higher than partition count requires rebalance() + // to distribute data across all downstream operators, otherwise excess + // source operators remain idle due to FLIP-27 split assignment. + DataStream resultStream = sourceStream; if (sourceParallelism != null && sourceParallelism > 0) { sourceStream.setParallelism(sourceParallelism); + // Add rebalance to redistribute data across all parallel instances + // This ensures downstream operators can utilize the increased parallelism + resultStream = sourceStream.rebalance(); } // Prefer explicit user-provided UID prefix if present; otherwise rely on provider context. @@ -319,11 +326,11 @@ public DataStream produceDataStream( final String trimmedPrefix = sourceUidPrefix.trim(); if (!trimmedPrefix.isEmpty()) { sourceStream.uid(trimmedPrefix + ":" + PSC_TRANSFORMATION + ":" + tableIdentifier); - return sourceStream; + return resultStream; } } providerContext.generateUid(PSC_TRANSFORMATION).ifPresent(sourceStream::uid); - return sourceStream; + return resultStream; } @Override From 2569d42b36bc97c278e73856c506adfda9c7c800 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Thu, 23 Oct 2025 20:11:18 -0400 Subject: [PATCH 5/9] revert pom --- psc-flink/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml index c3f171af..f09e9bf1 100644 --- a/psc-flink/pom.xml +++ b/psc-flink/pom.xml @@ -459,7 +459,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.2.0 + 2.9.1 ${javadoc.opts} From 40f54468dae7287d6c72bcb92d20142a4282deaf Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Sun, 26 Oct 2025 15:06:06 -0400 Subject: [PATCH 6/9] removing inference and keeping boolean flag --- .../psc/table/PscConnectorOptions.java | 29 +-- .../psc/table/PscDynamicSource.java | 33 ++-- .../psc/table/PscDynamicTableFactory.java | 49 +---- .../psc/table/PscTableSourceUtil.java | 174 ------------------ .../table/UpsertPscDynamicTableFactory.java | 45 +---- .../psc/table/PscDynamicTableFactoryTest.java | 101 +--------- .../psc/table/PscTableSourceUtilTest.java | 121 ------------ 7 files changed, 34 insertions(+), 518 deletions(-) delete mode 100644 psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java delete mode 100644 psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index f3f41550..3e96c945 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -109,32 +109,15 @@ public class PscConnectorOptions { public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; - public static final ConfigOption SCAN_PARALLELISM = - ConfigOptions.key("scan.parallelism") - .intType() - .noDefaultValue() - .withDescription( - "Defines a custom parallelism for the scan source. " - + "By default, if this option is not defined, the planner will derive the parallelism " - + "based on the number of partitions. When this option is set, the specified " - + "parallelism will be used and partitions will be distributed across all readers."); - - public static final ConfigOption INFER_SCAN_PARALLELISM = - ConfigOptions.key("scan.parallelism.infer") + public static final ConfigOption SCAN_ENABLE_RESCALE = + ConfigOptions.key("scan.enable-rescale") .booleanType() .defaultValue(false) .withDescription( - "If false, parallelism is set by 'scan.parallelism' or Flink's default.\n" - + "If true, source parallelism is inferred from topic partition count. " - + "This option is ignored if 'scan.parallelism' is explicitly set."); - - public static final ConfigOption INFER_SCAN_PARALLELISM_MAX = - ConfigOptions.key("scan.parallelism.infer.max") - .intType() - .defaultValue(128) - .withDescription( - "Maximum inferred parallelism for scan operator when 'scan.parallelism.infer' is enabled. " - + "The actual parallelism will be min(partition_count, this_value)."); + "Enable rescale() shuffle to redistribute data when topic partitions " + + "are fewer than downstream parallelism. This allows downstream operators " + + "to utilize higher parallelism than partition count. Default: false to " + + "avoid unnecessary shuffle overhead when partitions >= pipeline parallelism."); // -------------------------------------------------------------------------------------------- // Psc specific options diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index f9a9a3ac..bad78fa6 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -169,8 +169,8 @@ public class PscDynamicSource /** Optional user-provided UID prefix for stabilizing operator UIDs across DAG changes. */ protected final @Nullable String sourceUidPrefix; - /** Optional parallelism for the source operator. If not set, uses Flink's default. */ - protected final @Nullable Integer sourceParallelism; + /** Enable rescale() shuffle to redistribute data across downstream operators. */ + protected final boolean enableRescale; public PscDynamicSource( DataType physicalDataType, @@ -191,7 +191,7 @@ public PscDynamicSource( boolean upsertMode, String tableIdentifier, @Nullable String sourceUidPrefix, - @Nullable Integer sourceParallelism) { + boolean enableRescale) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -232,12 +232,12 @@ public PscDynamicSource( this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; this.sourceUidPrefix = sourceUidPrefix; - this.sourceParallelism = sourceParallelism; + this.enableRescale = enableRescale; } /** - * Backward-compatible constructor without UID prefix and parallelism. Delegates to the full - * constructor with null {@code sourceUidPrefix} and {@code sourceParallelism}. + * Backward-compatible constructor without UID prefix and rescale. Delegates to the full + * constructor with null {@code sourceUidPrefix} and false {@code enableRescale}. */ public PscDynamicSource( DataType physicalDataType, @@ -276,7 +276,7 @@ public PscDynamicSource( upsertMode, tableIdentifier, null, - null); + false); } @Override @@ -309,16 +309,11 @@ public DataStream produceDataStream( execEnv.fromSource( pscSource, watermarkStrategy, "PscSource-" + tableIdentifier); - // Apply explicit parallelism if configured - // Note: Setting parallelism higher than partition count requires rebalance() - // to distribute data across all downstream operators, otherwise excess - // source operators remain idle due to FLIP-27 split assignment. + // Let Flink handle source parallelism automatically (defaults to partition count) + // Only add rescale if explicitly enabled to redistribute data across downstream operators DataStream resultStream = sourceStream; - if (sourceParallelism != null && sourceParallelism > 0) { - sourceStream.setParallelism(sourceParallelism); - // Add rebalance to redistribute data across all parallel instances - // This ensures downstream operators can utilize the increased parallelism - resultStream = sourceStream.rebalance(); + if (enableRescale) { + resultStream = sourceStream.rescale(); } // Prefer explicit user-provided UID prefix if present; otherwise rely on provider context. @@ -416,7 +411,7 @@ public DynamicTableSource copy() { upsertMode, tableIdentifier, sourceUidPrefix, - sourceParallelism); + enableRescale); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -457,7 +452,7 @@ public boolean equals(Object o) { && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) && Objects.equals(sourceUidPrefix, that.sourceUidPrefix) - && Objects.equals(sourceParallelism, that.sourceParallelism) + && enableRescale == that.enableRescale && Objects.equals(watermarkStrategy, that.watermarkStrategy); } @@ -484,7 +479,7 @@ public int hashCode() { upsertMode, tableIdentifier, sourceUidPrefix, - sourceParallelism, + enableRescale, watermarkStrategy); } diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index be7eddaf..9cda26d9 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -75,9 +75,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_PARALLELISM; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI; @@ -152,9 +150,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); - options.add(SCAN_PARALLELISM); - options.add(INFER_SCAN_PARALLELISM); - options.add(INFER_SCAN_PARALLELISM_MAX); + options.add(SCAN_ENABLE_RESCALE); options.add(SOURCE_UID_PREFIX); return options; } @@ -218,28 +214,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); - // Determine scan parallelism with priority: - // 1. Explicit scan.parallelism (if set) - // 2. Inferred parallelism (if inference enabled) - // 3. null (Flink default) - Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); - - if (scanParallelism == null && tableOptions.get(INFER_SCAN_PARALLELISM)) { - // Use lazy supplier to avoid metadata calls unless needed - final List topicUris = getSourceTopicUris(tableOptions); - final Properties props = properties; - scanParallelism = PscTableSourceUtil.inferParallelism( - tableOptions, - () -> { - if (topicUris == null || topicUris.isEmpty()) { - return 1; - } - // Extract cluster URI from first topic URI for metadata client - String firstTopicUri = topicUris.get(0); - String clusterUri = extractClusterUri(firstTopicUri); - return PscTableSourceUtil.getPartitionCount(props, topicUris, clusterUri); - }); - } + final boolean enableRescale = tableOptions.get(SCAN_ENABLE_RESCALE); return createPscTableSource( physicalDataType, @@ -259,7 +234,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString(), tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), - scanParallelism); + enableRescale); } @Override @@ -426,7 +401,7 @@ protected PscDynamicSource createPscTableSource( long endTimestampMillis, String tableIdentifier, @Nullable String sourceUidPrefix, - @Nullable Integer scanParallelism) { + boolean enableRescale) { return new PscDynamicSource( physicalDataType, keyDecodingFormat, @@ -446,7 +421,7 @@ protected PscDynamicSource createPscTableSource( false, tableIdentifier, sourceUidPrefix, - scanParallelism); + enableRescale); } protected PscDynamicSink creatPscTableSink( @@ -479,16 +454,4 @@ protected PscDynamicSink creatPscTableSink( parallelism, transactionalIdPrefix); } - - /** - * Extract cluster URI from a full topic URI. - * For example: "kafka://cluster-name/topic-name" -> "kafka://cluster-name" - */ - private static String extractClusterUri(String topicUri) { - int lastSlash = topicUri.lastIndexOf('/'); - if (lastSlash > 0) { - return topicUri.substring(0, lastSlash); - } - return topicUri; - } } diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java deleted file mode 100644 index 4fb7c708..00000000 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtil.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.pinterest.flink.streaming.connectors.psc.table; - -import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.config.PscConfigurationUtils; -import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.exception.startup.TopicUriSyntaxException; -import com.pinterest.psc.metadata.TopicUriMetadata; -import com.pinterest.psc.metadata.client.PscMetadataClient; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.function.Supplier; - -/** - * Utility class for PSC table source operations, particularly parallelism inference. - * Inspired by Iceberg's SourceUtil pattern. - */ -public class PscTableSourceUtil { - - private static final Logger LOG = LoggerFactory.getLogger(PscTableSourceUtil.class); - private static final Duration METADATA_TIMEOUT = Duration.ofSeconds(30); - - private PscTableSourceUtil() { - // Utility class, no instantiation - } - - /** - * Infer source parallelism based on topic partition count, similar to Iceberg's approach. - * - *

The inference logic follows this priority: - *

    - *
  1. Start with Flink's global default parallelism from {@link ExecutionConfigOptions#TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM}
  2. - *
  3. If {@link PscConnectorOptions#INFER_SCAN_PARALLELISM} is enabled, use partition count
  4. - *
  5. Cap the result at {@link PscConnectorOptions#INFER_SCAN_PARALLELISM_MAX}
  6. - *
  7. Ensure result is at least 1
  8. - *
- * - * @param readableConfig Flink configuration - * @param partitionCountProvider Lazy supplier of partition count (expensive metadata operation) - * @return Inferred parallelism value - */ - public static int inferParallelism( - ReadableConfig readableConfig, - Supplier partitionCountProvider) { - - // 1. Start with Flink's global default parallelism - int parallelism = readableConfig.get( - ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); - - LOG.debug("Starting parallelism inference with Flink default: {}", parallelism); - - // 2. Check if inference is enabled - if (readableConfig.get(PscConnectorOptions.INFER_SCAN_PARALLELISM)) { - int maxInferParallelism = readableConfig.get( - PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX); - - // Get actual partition count (lazy evaluation to avoid unnecessary metadata calls) - int partitionCount = partitionCountProvider.get(); - LOG.debug("Partition count from provider: {}", partitionCount); - - // Use minimum of partition count and max cap - parallelism = Math.min(partitionCount, maxInferParallelism); - LOG.info("Inferred parallelism from {} partitions: {} (max cap: {})", - partitionCount, parallelism, maxInferParallelism); - } else { - LOG.debug("Parallelism inference is disabled, using default: {}", parallelism); - } - - // 3. Ensure parallelism is at least 1 - parallelism = Math.max(1, parallelism); - LOG.debug("Final inferred parallelism: {}", parallelism); - - return parallelism; - } - - /** - * Get partition count for given topic URIs by querying PSC metadata. - * - *

This method creates a temporary {@link PscMetadataClient} to query topic metadata - * and counts the total number of partitions across all specified topics. - * - * @param properties PSC properties with cluster configuration - * @param topicUriStrings List of topic URI strings to query - * @param clusterUriString Cluster URI string for metadata client - * @return Total partition count across all topics - * @throws RuntimeException if metadata query fails - */ - public static int getPartitionCount( - Properties properties, - List topicUriStrings, - String clusterUriString) { - - LOG.debug("Fetching partition count for {} topics from cluster {}", - topicUriStrings.size(), clusterUriString); - - if (topicUriStrings == null || topicUriStrings.isEmpty()) { - LOG.warn("No topics specified, returning partition count of 0"); - return 0; - } - - Properties metadataProps = new Properties(); - metadataProps.putAll(properties); - - // Create temporary metadata client - try (PscMetadataClient metadataClient = new PscMetadataClient( - PscConfigurationUtils.propertiesToPscConfiguration(metadataProps))) { - - // Parse cluster URI - TopicUri baseClusterUri = TopicUri.validate(clusterUriString); - - // Parse topic URIs - Set topicUriSet = new HashSet<>(); - for (String topicUriStr : topicUriStrings) { - try { - topicUriSet.add(TopicUri.validate(topicUriStr)); - } catch (TopicUriSyntaxException e) { - LOG.warn("Failed to parse topic URI: {}, skipping", topicUriStr, e); - } - } - - if (topicUriSet.isEmpty()) { - LOG.warn("No valid topic URIs after parsing, returning partition count of 0"); - return 0; - } - - // Query metadata for all topics - Map metadata = metadataClient.describeTopicUris( - baseClusterUri, - topicUriSet, - METADATA_TIMEOUT); - - // Sum up partition counts across all topics - int totalPartitions = metadata.values().stream() - .mapToInt(m -> m.getTopicUriPartitions().size()) - .sum(); - - LOG.info("Total partitions across {} topics: {}", topicUriSet.size(), totalPartitions); - return totalPartitions; - - } catch (ConfigurationException e) { - throw new RuntimeException("Failed to create PscMetadataClient for partition count query", e); - } catch (TopicUriSyntaxException e) { - throw new RuntimeException("Failed to parse cluster URI: " + clusterUriString, e); - } catch (Exception e) { - throw new RuntimeException("Failed to fetch partition count for parallelism inference", e); - } - } -} - diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index c13e6269..2adb0ef0 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -61,9 +61,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_MODE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_PARALLELISM; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; @@ -115,9 +113,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); - options.add(SCAN_PARALLELISM); - options.add(INFER_SCAN_PARALLELISM); - options.add(INFER_SCAN_PARALLELISM_MAX); + options.add(SCAN_ENABLE_RESCALE); options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); options.add(SOURCE_UID_PREFIX); @@ -156,28 +152,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); - // Determine scan parallelism with priority: - // 1. Explicit scan.parallelism (if set) - // 2. Inferred parallelism (if inference enabled) - // 3. null (Flink default) - Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); - - if (scanParallelism == null && tableOptions.get(INFER_SCAN_PARALLELISM)) { - // Use lazy supplier to avoid metadata calls unless needed - final List topicUris = getSourceTopicUris(tableOptions); - final Properties props = properties; - scanParallelism = PscTableSourceUtil.inferParallelism( - tableOptions, - () -> { - if (topicUris == null || topicUris.isEmpty()) { - return 1; - } - // Extract cluster URI from first topic URI for metadata client - String firstTopicUri = topicUris.get(0); - String clusterUri = extractClusterUri(firstTopicUri); - return PscTableSourceUtil.getPartitionCount(props, topicUris, clusterUri); - }); - } + final boolean enableRescale = tableOptions.get(SCAN_ENABLE_RESCALE); return new PscDynamicSource( context.getPhysicalRowDataType(), @@ -198,7 +173,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { true, context.getObjectIdentifier().asSummaryString(), tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), - scanParallelism); + enableRescale); } @Override @@ -458,16 +433,4 @@ public int hashCode() { return Objects.hash(innerEncodingFormat); } } - - /** - * Extract cluster URI from a full topic URI. - * For example: "kafka://cluster-name/topic-name" -> "kafka://cluster-name" - */ - private static String extractClusterUri(String topicUri) { - int lastSlash = topicUri.lastIndexOf('/'); - if (lastSlash > 0) { - return topicUri.substring(0, lastSlash); - } - return topicUri; - } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index 33aceec3..6eef1e55 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -1247,110 +1247,17 @@ public void testTableSinkWithParallelism() { } @Test - public void testTableSourceWithParallelism() { - final Map modifiedOptions = - getModifiedOptions( - getBasicSourceOptions(), options -> options.put("scan.parallelism", "10")); - - final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); - assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - - final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Verify the parallelism is stored in the source - assertThat(pscSource.sourceParallelism).isEqualTo(10); - - // Verify that the source equals the expected source with parallelism set - final Map specificOffsets = new HashMap<>(); - specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0); - specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_1), OFFSET_1); - - final DecodingFormat> valueDecodingFormat = - new DecodingFormatMock(",", true); - - final PscDynamicSource expectedPscSource = - new PscDynamicSource( - SCHEMA_DATA_TYPE, - null, - valueDecodingFormat, - new int[0], - new int[] {0, 1, 2}, - null, - Collections.singletonList(TOPIC_URI), - null, - PSC_SOURCE_PROPERTIES, - StartupMode.SPECIFIC_OFFSETS, - specificOffsets, - 0, - BoundedMode.UNBOUNDED, - Collections.emptyMap(), - 0, - false, - FactoryMocks.IDENTIFIER.asSummaryString(), - null, - 10); - assertThat(pscSource).isEqualTo(expectedPscSource); - } - - @Test - public void testTableSourceWithInferredParallelismDisabledByDefault() { - // Test that inference is disabled by default (backward compatibility) - final Map modifiedOptions = getBasicSourceOptions(); - - final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); - assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - - final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Without explicit parallelism or inference enabled, should be null - assertThat(pscSource.sourceParallelism).isNull(); - } - - @Test - public void testTableSourceExplicitParallelismOverridesInference() { - // Test that explicit parallelism takes priority over inference - // Even with inference enabled, explicit value should be used and no metadata calls made + public void testTableSourceWithRescale() { final Map modifiedOptions = getModifiedOptions( getBasicSourceOptions(), - options -> { - options.put("scan.parallelism", "20"); - options.put("scan.parallelism.infer", "true"); - options.put("scan.parallelism.infer.max", "50"); - }); + options -> options.put("scan.enable-rescale", "true")); final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); assertThat(actualSource).isInstanceOf(PscDynamicSource.class); final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Verify that explicit parallelism is used, not inferred - assertThat(pscSource.sourceParallelism).isEqualTo(20); - } - - @Test - public void testTableSourceInferenceConfigOptionsAccepted() { - // Test that inference config options are recognized (doesn't validate inference logic) - // This test ensures the options are registered but doesn't trigger actual metadata calls - final Map modifiedOptions = - getModifiedOptions( - getBasicSourceOptions(), - options -> { - // Use explicit parallelism to avoid metadata calls in unit test - options.put("scan.parallelism", "15"); - // But also set inference options to verify they're accepted - options.put("scan.parallelism.infer", "false"); - options.put("scan.parallelism.infer.max", "100"); - }); - - // Should not throw validation exception - final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); - assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - - final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Explicit parallelism should be used - assertThat(pscSource.sourceParallelism).isEqualTo(15); + assertThat(pscSource.enableRescale).isTrue(); } @Test @@ -1838,7 +1745,7 @@ private static PscDynamicSource createExpectedScanSource( false, FactoryMocks.IDENTIFIER.asSummaryString(), null, - null); + false); } private static PscDynamicSink createExpectedSink( diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java deleted file mode 100644 index 43b96090..00000000 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableSourceUtilTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.pinterest.flink.streaming.connectors.psc.table; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.junit.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Unit tests for {@link PscTableSourceUtil}. - */ -public class PscTableSourceUtilTest { - - @Test - public void testInferParallelismWithInferenceDisabled() { - Configuration config = new Configuration(); - config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, false); - - // Supplier should not be called when inference is disabled - int parallelism = PscTableSourceUtil.inferParallelism(config, () -> { - throw new RuntimeException("Supplier should not be called"); - }); - - // Should use Flink's default parallelism - assertThat(parallelism).isEqualTo(16); - } - - @Test - public void testInferParallelismWithInferenceEnabled() { - Configuration config = new Configuration(); - config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX, 50); - - // Supplier returns 30 partitions - int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 30); - - // Should use partition count since it's less than max - assertThat(parallelism).isEqualTo(30); - } - - @Test - public void testInferParallelismRespectsMaxCap() { - Configuration config = new Configuration(); - config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX, 50); - - // Supplier returns 200 partitions (more than max) - int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 200); - - // Should cap at max value - assertThat(parallelism).isEqualTo(50); - } - - @Test - public void testInferParallelismWithZeroPartitions() { - Configuration config = new Configuration(); - config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM_MAX, 50); - - // Supplier returns 0 partitions - int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 0); - - // Should ensure minimum of 1 - assertThat(parallelism).isEqualTo(1); - } - - @Test - public void testInferParallelismWithDefaultMax() { - Configuration config = new Configuration(); - config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, true); - // Don't set max, use default (128) - - // Supplier returns 150 partitions - int parallelism = PscTableSourceUtil.inferParallelism(config, () -> 150); - - // Should cap at default max value (128) - assertThat(parallelism).isEqualTo(128); - } - - @Test - public void testInferParallelismLazyEvaluation() { - Configuration config = new Configuration(); - config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 16); - config.set(PscConnectorOptions.INFER_SCAN_PARALLELISM, false); - - boolean[] supplierCalled = {false}; - - // Supplier should NOT be called when inference is disabled - int parallelism = PscTableSourceUtil.inferParallelism(config, () -> { - supplierCalled[0] = true; - return 100; - }); - - assertThat(parallelism).isEqualTo(16); - assertThat(supplierCalled[0]).isFalse(); // Verify lazy evaluation - } -} - From ace54aacbd2db4e1fbaec05888e51b198ffb1bb3 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Tue, 28 Oct 2025 11:05:07 -0400 Subject: [PATCH 7/9] rework rescale apply logic --- .../psc/table/PscConnectorOptions.java | 10 +- .../psc/table/PscDynamicTableFactory.java | 132 ++++++++++++++++- .../table/UpsertPscDynamicTableFactory.java | 140 +++++++++++++++++- .../psc/table/PscDynamicTableFactoryTest.java | 25 +++- 4 files changed, 296 insertions(+), 11 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index 3e96c945..bd8e5abc 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -114,10 +114,12 @@ public class PscConnectorOptions { .booleanType() .defaultValue(false) .withDescription( - "Enable rescale() shuffle to redistribute data when topic partitions " + - "are fewer than downstream parallelism. This allows downstream operators " + - "to utilize higher parallelism than partition count. Default: false to " + - "avoid unnecessary shuffle overhead when partitions >= pipeline parallelism."); + "Enable smart rescale() to redistribute data when needed. When enabled, the connector " + + "automatically compares the configured pipeline parallelism (table.exec.resource.default-parallelism) " + + "with the topic's partition count. Rescale is applied ONLY when parallelism > partition count, " + + "avoiding unnecessary shuffle overhead otherwise. This allows downstream operators to fully utilize " + + "higher parallelism than the source partition count. " + + "Default: false (no automatic shuffling)."); // -------------------------------------------------------------------------------------------- // Psc specific options diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index 9cda26d9..694fcdaf 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -23,6 +23,10 @@ import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition; import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.metadata.TopicUriMetadata; +import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -32,6 +36,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; @@ -214,7 +219,12 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); - final boolean enableRescale = tableOptions.get(SCAN_ENABLE_RESCALE); + // Determine if rescale should be applied based on parallelism vs partition count + final boolean shouldRescale = shouldApplyRescale( + tableOptions, + context.getConfiguration(), + getSourceTopicUris(tableOptions), + properties); return createPscTableSource( physicalDataType, @@ -234,7 +244,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString(), tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), - enableRescale); + shouldRescale); } @Override @@ -383,6 +393,124 @@ private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig table // -------------------------------------------------------------------------------------------- + /** + * Determines whether rescale() should be applied based on: + * 1. scan.enable-rescale flag must be true + * 2. Global parallelism (table.exec.resource.default-parallelism) > partition count + * + * This automatically avoids unnecessary shuffle overhead when partitions >= parallelism. + */ + private boolean shouldApplyRescale( + ReadableConfig tableOptions, + ReadableConfig globalConfig, + List topicUris, + Properties pscProperties) { + + // First check if rescale is enabled by user + if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { + return false; + } + + // Get the global default parallelism + int defaultParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + + // If parallelism is -1 (not set) or <= 0, cannot determine, so don't apply rescale + if (defaultParallelism <= 0) { + LOG.info("scan.enable-rescale is true, but table.exec.resource.default-parallelism is {} (not set). " + + "Rescale will not be applied. Set a positive parallelism value to enable automatic rescale.", + defaultParallelism); + return false; + } + + // Query partition count from PSC metadata + int partitionCount = getTopicPartitionCount(topicUris, pscProperties); + + // If partition count couldn't be determined, don't apply rescale (fail-safe) + if (partitionCount <= 0) { + LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " + + "Rescale will not be applied."); + return false; + } + + // Apply rescale only if user-configured parallelism exceeds partition count + boolean shouldRescale = defaultParallelism > partitionCount; + + if (shouldRescale) { + LOG.info("Applying rescale(): configured parallelism ({}) > partition count ({}). " + + "Data will be redistributed to fully utilize downstream operators.", + defaultParallelism, partitionCount); + } else { + LOG.info("Skipping rescale(): configured parallelism ({}) <= partition count ({}). " + + "No shuffle needed as source will naturally match or exceed desired parallelism.", + defaultParallelism, partitionCount); + } + + return shouldRescale; + } + + /** + * Queries the minimum partition count across all specified topic URIs. + * Returns -1 if partition count cannot be determined. + */ + private int getTopicPartitionCount(List topicUris, Properties pscProperties) { + if (topicUris == null || topicUris.isEmpty()) { + LOG.warn("No topic URIs provided for partition count query."); + return -1; + } + + PscMetadataClient metadataClient = null; + try { + // Create PSC configuration from properties + PscConfiguration pscConfig = new PscConfiguration(); + for (String key : pscProperties.stringPropertyNames()) { + pscConfig.setProperty(key, pscProperties.getProperty(key)); + } + + metadataClient = new PscMetadataClient(pscConfig); + + int minPartitionCount = Integer.MAX_VALUE; + + for (String topicUriStr : topicUris) { + try { + TopicUri topicUri = TopicUri.validate(topicUriStr); + + // Query metadata for this topic + Map metadataMap = metadataClient.describeTopicUris( + topicUri, // cluster URI (can use full topic URI) + java.util.Collections.singleton(topicUri), + Duration.ofSeconds(10)); + + TopicUriMetadata metadata = metadataMap.get(topicUri); + if (metadata != null) { + int partitionCount = metadata.getTopicUriPartitions().size(); + LOG.debug("Topic {} has {} partitions", topicUriStr, partitionCount); + minPartitionCount = Math.min(minPartitionCount, partitionCount); + } else { + LOG.warn("No metadata returned for topic {}", topicUriStr); + } + } catch (Exception e) { + LOG.warn("Failed to query partition count for topic {}: {}", + topicUriStr, e.getMessage()); + } + } + + return (minPartitionCount == Integer.MAX_VALUE) ? -1 : minPartitionCount; + + } catch (Exception e) { + LOG.warn("Failed to create PSC metadata client or query partition count: {}", + e.getMessage()); + return -1; + } finally { + if (metadataClient != null) { + try { + metadataClient.close(); + } catch (Exception e) { + LOG.warn("Failed to close PSC metadata client: {}", e.getMessage()); + } + } + } + } + protected PscDynamicSource createPscTableSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index 2adb0ef0..c2f91cca 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -19,6 +19,10 @@ package com.pinterest.flink.streaming.connectors.psc.table; import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.metadata.TopicUriMetadata; +import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.tuple.Tuple2; @@ -27,6 +31,9 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -86,6 +93,8 @@ public class UpsertPscDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + private static final Logger LOG = LoggerFactory.getLogger(UpsertPscDynamicTableFactory.class); + public static final String IDENTIFIER = "upsert-psc"; @Override @@ -152,7 +161,12 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); - final boolean enableRescale = tableOptions.get(SCAN_ENABLE_RESCALE); + // Determine if rescale should be applied based on parallelism vs partition count + final boolean shouldRescale = shouldApplyRescale( + tableOptions, + context.getConfiguration(), + getSourceTopicUris(tableOptions), + properties); return new PscDynamicSource( context.getPhysicalRowDataType(), @@ -173,7 +187,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { true, context.getObjectIdentifier().asSummaryString(), tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), - enableRescale); + shouldRescale); } @Override @@ -433,4 +447,126 @@ public int hashCode() { return Objects.hash(innerEncodingFormat); } } + + // -------------------------------------------------------------------------------------------- + // Helper methods for smart rescale logic + // -------------------------------------------------------------------------------------------- + + /** + * Determines whether rescale() should be applied based on: + * 1. scan.enable-rescale flag must be true + * 2. Global parallelism (table.exec.resource.default-parallelism) > partition count + * + * This automatically avoids unnecessary shuffle overhead when partitions >= parallelism. + */ + private boolean shouldApplyRescale( + ReadableConfig tableOptions, + ReadableConfig globalConfig, + List topicUris, + Properties pscProperties) { + + // First check if rescale is enabled by user + if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { + return false; + } + + // Get the global default parallelism + int defaultParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + + // If parallelism is -1 (not set) or <= 0, cannot determine, so don't apply rescale + if (defaultParallelism <= 0) { + LOG.info("scan.enable-rescale is true, but table.exec.resource.default-parallelism is {} (not set). " + + "Rescale will not be applied. Set a positive parallelism value to enable automatic rescale.", + defaultParallelism); + return false; + } + + // Query partition count from PSC metadata + int partitionCount = getTopicPartitionCount(topicUris, pscProperties); + + // If partition count couldn't be determined, don't apply rescale (fail-safe) + if (partitionCount <= 0) { + LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " + + "Rescale will not be applied."); + return false; + } + + // Apply rescale only if user-configured parallelism exceeds partition count + boolean shouldRescale = defaultParallelism > partitionCount; + + if (shouldRescale) { + LOG.info("Applying rescale(): configured parallelism ({}) > partition count ({}). " + + "Data will be redistributed to fully utilize downstream operators.", + defaultParallelism, partitionCount); + } else { + LOG.info("Skipping rescale(): configured parallelism ({}) <= partition count ({}). " + + "No shuffle needed as source will naturally match or exceed desired parallelism.", + defaultParallelism, partitionCount); + } + + return shouldRescale; + } + + /** + * Queries the minimum partition count across all specified topic URIs. + * Returns -1 if partition count cannot be determined. + */ + private int getTopicPartitionCount(List topicUris, Properties pscProperties) { + if (topicUris == null || topicUris.isEmpty()) { + LOG.warn("No topic URIs provided for partition count query."); + return -1; + } + + PscMetadataClient metadataClient = null; + try { + // Create PSC configuration from properties + PscConfiguration pscConfig = new PscConfiguration(); + for (String key : pscProperties.stringPropertyNames()) { + pscConfig.setProperty(key, pscProperties.getProperty(key)); + } + + metadataClient = new PscMetadataClient(pscConfig); + + int minPartitionCount = Integer.MAX_VALUE; + + for (String topicUriStr : topicUris) { + try { + TopicUri topicUri = TopicUri.validate(topicUriStr); + + // Query metadata for this topic + java.util.Map metadataMap = metadataClient.describeTopicUris( + topicUri, // cluster URI (can use full topic URI) + java.util.Collections.singleton(topicUri), + java.time.Duration.ofSeconds(10)); + + TopicUriMetadata metadata = metadataMap.get(topicUri); + if (metadata != null) { + int partitionCount = metadata.getTopicUriPartitions().size(); + LOG.debug("Topic {} has {} partitions", topicUriStr, partitionCount); + minPartitionCount = Math.min(minPartitionCount, partitionCount); + } else { + LOG.warn("No metadata returned for topic {}", topicUriStr); + } + } catch (Exception e) { + LOG.warn("Failed to query partition count for topic {}: {}", + topicUriStr, e.getMessage()); + } + } + + return (minPartitionCount == Integer.MAX_VALUE) ? -1 : minPartitionCount; + + } catch (Exception e) { + LOG.warn("Failed to create PSC metadata client or query partition count: {}", + e.getMessage()); + return -1; + } finally { + if (metadataClient != null) { + try { + metadataClient.close(); + } catch (Exception e) { + LOG.warn("Failed to close PSC metadata client: {}", e.getMessage()); + } + } + } + } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index 6eef1e55..13b5a6ea 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -1247,7 +1247,21 @@ public void testTableSinkWithParallelism() { } @Test - public void testTableSourceWithRescale() { + public void testTableSourceWithRescaleDisabled() { + // When scan.enable-rescale is false (default), rescale should not be applied + final Map modifiedOptions = getBasicSourceOptions(); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isFalse(); + } + + @Test + public void testTableSourceWithRescaleEnabled() { + // When scan.enable-rescale is true but parallelism is not configured, + // rescale should not be applied (fail-safe behavior) final Map modifiedOptions = getModifiedOptions( getBasicSourceOptions(), @@ -1256,8 +1270,13 @@ public void testTableSourceWithRescale() { final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - assertThat(pscSource.enableRescale).isTrue(); + // Note: The actual rescale decision is made in the factory based on + // partition count vs parallelism comparison. This test just verifies + // that the flag can be enabled without errors. + // In real scenarios, rescale will only be applied if: + // 1. scan.enable-rescale = true + // 2. table.exec.resource.default-parallelism > partition count + // 3. Metadata query succeeds } @Test From 89e91d406a077d8754088da7d4bacb817a2875f9 Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Thu, 30 Oct 2025 13:08:53 -0400 Subject: [PATCH 8/9] make common util --- .../psc/table/PscDynamicTableFactory.java | 125 +----------- .../connectors/psc/table/PscRescaleUtil.java | 182 ++++++++++++++++++ .../table/UpsertPscDynamicTableFactory.java | 133 +------------ 3 files changed, 184 insertions(+), 256 deletions(-) create mode 100644 psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscRescaleUtil.java diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index 694fcdaf..885959f0 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -23,10 +23,6 @@ import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition; import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner; -import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.config.PscConfiguration; -import com.pinterest.psc.metadata.TopicUriMetadata; -import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -36,7 +32,6 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; @@ -220,7 +215,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); // Determine if rescale should be applied based on parallelism vs partition count - final boolean shouldRescale = shouldApplyRescale( + final boolean shouldRescale = PscRescaleUtil.shouldApplyRescale( tableOptions, context.getConfiguration(), getSourceTopicUris(tableOptions), @@ -393,124 +388,6 @@ private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig table // -------------------------------------------------------------------------------------------- - /** - * Determines whether rescale() should be applied based on: - * 1. scan.enable-rescale flag must be true - * 2. Global parallelism (table.exec.resource.default-parallelism) > partition count - * - * This automatically avoids unnecessary shuffle overhead when partitions >= parallelism. - */ - private boolean shouldApplyRescale( - ReadableConfig tableOptions, - ReadableConfig globalConfig, - List topicUris, - Properties pscProperties) { - - // First check if rescale is enabled by user - if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { - return false; - } - - // Get the global default parallelism - int defaultParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); - - // If parallelism is -1 (not set) or <= 0, cannot determine, so don't apply rescale - if (defaultParallelism <= 0) { - LOG.info("scan.enable-rescale is true, but table.exec.resource.default-parallelism is {} (not set). " + - "Rescale will not be applied. Set a positive parallelism value to enable automatic rescale.", - defaultParallelism); - return false; - } - - // Query partition count from PSC metadata - int partitionCount = getTopicPartitionCount(topicUris, pscProperties); - - // If partition count couldn't be determined, don't apply rescale (fail-safe) - if (partitionCount <= 0) { - LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " + - "Rescale will not be applied."); - return false; - } - - // Apply rescale only if user-configured parallelism exceeds partition count - boolean shouldRescale = defaultParallelism > partitionCount; - - if (shouldRescale) { - LOG.info("Applying rescale(): configured parallelism ({}) > partition count ({}). " + - "Data will be redistributed to fully utilize downstream operators.", - defaultParallelism, partitionCount); - } else { - LOG.info("Skipping rescale(): configured parallelism ({}) <= partition count ({}). " + - "No shuffle needed as source will naturally match or exceed desired parallelism.", - defaultParallelism, partitionCount); - } - - return shouldRescale; - } - - /** - * Queries the minimum partition count across all specified topic URIs. - * Returns -1 if partition count cannot be determined. - */ - private int getTopicPartitionCount(List topicUris, Properties pscProperties) { - if (topicUris == null || topicUris.isEmpty()) { - LOG.warn("No topic URIs provided for partition count query."); - return -1; - } - - PscMetadataClient metadataClient = null; - try { - // Create PSC configuration from properties - PscConfiguration pscConfig = new PscConfiguration(); - for (String key : pscProperties.stringPropertyNames()) { - pscConfig.setProperty(key, pscProperties.getProperty(key)); - } - - metadataClient = new PscMetadataClient(pscConfig); - - int minPartitionCount = Integer.MAX_VALUE; - - for (String topicUriStr : topicUris) { - try { - TopicUri topicUri = TopicUri.validate(topicUriStr); - - // Query metadata for this topic - Map metadataMap = metadataClient.describeTopicUris( - topicUri, // cluster URI (can use full topic URI) - java.util.Collections.singleton(topicUri), - Duration.ofSeconds(10)); - - TopicUriMetadata metadata = metadataMap.get(topicUri); - if (metadata != null) { - int partitionCount = metadata.getTopicUriPartitions().size(); - LOG.debug("Topic {} has {} partitions", topicUriStr, partitionCount); - minPartitionCount = Math.min(minPartitionCount, partitionCount); - } else { - LOG.warn("No metadata returned for topic {}", topicUriStr); - } - } catch (Exception e) { - LOG.warn("Failed to query partition count for topic {}: {}", - topicUriStr, e.getMessage()); - } - } - - return (minPartitionCount == Integer.MAX_VALUE) ? -1 : minPartitionCount; - - } catch (Exception e) { - LOG.warn("Failed to create PSC metadata client or query partition count: {}", - e.getMessage()); - return -1; - } finally { - if (metadataClient != null) { - try { - metadataClient.close(); - } catch (Exception e) { - LOG.warn("Failed to close PSC metadata client: {}", e.getMessage()); - } - } - } - } - protected PscDynamicSource createPscTableSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscRescaleUtil.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscRescaleUtil.java new file mode 100644 index 00000000..9c55dcde --- /dev/null +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscRescaleUtil.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pinterest.flink.streaming.connectors.psc.table; + +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.metadata.TopicUriMetadata; +import com.pinterest.psc.metadata.client.PscMetadataClient; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; + +/** + * Utility class for smart rescale decision logic shared between PSC table factories. + * + *

This class provides methods to determine whether rescale() should be applied to a PSC source + * based on comparing the configured pipeline parallelism with the actual topic partition count. + */ +public class PscRescaleUtil { + + private static final Logger LOG = LoggerFactory.getLogger(PscRescaleUtil.class); + + /** + * Determines whether rescale() should be applied based on: + * 1. scan.enable-rescale flag must be true + * 2. Global parallelism (table.exec.resource.default-parallelism) > partition count + * + *

This automatically avoids unnecessary shuffle overhead when partitions >= parallelism. + * + * @param tableOptions User's table configuration options + * @param globalConfig Global Flink configuration + * @param topicUris List of topic URIs to query for partition counts + * @param pscProperties PSC properties for metadata client connection + * @return true if rescale should be applied, false otherwise + */ + public static boolean shouldApplyRescale( + ReadableConfig tableOptions, + ReadableConfig globalConfig, + List topicUris, + Properties pscProperties) { + + // First check if rescale is enabled by user + if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { + return false; + } + + // Get the global default parallelism + int defaultParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + + // If parallelism is -1 (not set) or <= 0, cannot determine, so don't apply rescale + if (defaultParallelism <= 0) { + LOG.info("scan.enable-rescale is true, but table.exec.resource.default-parallelism is {} (not set). " + + "Rescale will not be applied. Set a positive parallelism value to enable automatic rescale.", + defaultParallelism); + return false; + } + + // Query partition count from PSC metadata + int partitionCount = getTopicPartitionCount(topicUris, pscProperties); + + // If partition count couldn't be determined, don't apply rescale (fail-safe) + if (partitionCount <= 0) { + LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " + + "Rescale will not be applied."); + return false; + } + + // Apply rescale only if user-configured parallelism exceeds partition count + boolean shouldRescale = defaultParallelism > partitionCount; + + if (shouldRescale) { + LOG.info("Applying rescale(): configured parallelism ({}) > partition count ({}). " + + "Data will be redistributed to fully utilize downstream operators.", + defaultParallelism, partitionCount); + } else { + LOG.info("Skipping rescale(): configured parallelism ({}) <= partition count ({}). " + + "No shuffle needed as source will naturally match or exceed desired parallelism.", + defaultParallelism, partitionCount); + } + + return shouldRescale; + } + + /** + * Queries the minimum partition count across all specified topic URIs. + * + *

For multi-topic sources, returns the minimum partition count as a conservative approach. + * If any topic has fewer partitions, that becomes the bottleneck. + * + * @param topicUris List of topic URIs to query + * @param pscProperties PSC properties for metadata client connection + * @return Minimum partition count across all topics, or -1 if count cannot be determined + */ + private static int getTopicPartitionCount(List topicUris, Properties pscProperties) { + if (topicUris == null || topicUris.isEmpty()) { + LOG.warn("No topic URIs provided for partition count query."); + return -1; + } + + PscMetadataClient metadataClient = null; + try { + // Create PSC configuration from properties + PscConfiguration pscConfig = new PscConfiguration(); + for (String key : pscProperties.stringPropertyNames()) { + pscConfig.setProperty(key, pscProperties.getProperty(key)); + } + + metadataClient = new PscMetadataClient(pscConfig); + + int minPartitionCount = Integer.MAX_VALUE; + + for (String topicUriStr : topicUris) { + try { + TopicUri topicUri = TopicUri.validate(topicUriStr); + + // Query metadata for this topic + Map metadataMap = metadataClient.describeTopicUris( + topicUri, // cluster URI (can use full topic URI) + java.util.Collections.singleton(topicUri), + Duration.ofSeconds(10)); + + TopicUriMetadata metadata = metadataMap.get(topicUri); + if (metadata != null) { + int partitionCount = metadata.getTopicUriPartitions().size(); + LOG.debug("Topic {} has {} partitions", topicUriStr, partitionCount); + minPartitionCount = Math.min(minPartitionCount, partitionCount); + } else { + LOG.warn("No metadata returned for topic {}", topicUriStr); + } + } catch (Exception e) { + LOG.warn("Failed to query partition count for topic {}: {}", + topicUriStr, e.getMessage()); + } + } + + return (minPartitionCount == Integer.MAX_VALUE) ? -1 : minPartitionCount; + + } catch (Exception e) { + LOG.warn("Failed to create PSC metadata client or query partition count: {}", + e.getMessage()); + return -1; + } finally { + if (metadataClient != null) { + try { + metadataClient.close(); + } catch (Exception e) { + LOG.warn("Failed to close PSC metadata client: {}", e.getMessage()); + } + } + } + } + + /** Private constructor to prevent instantiation. */ + private PscRescaleUtil() { + throw new UnsupportedOperationException("Utility class should not be instantiated"); + } +} + diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index c2f91cca..bb914997 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -19,21 +19,13 @@ package com.pinterest.flink.streaming.connectors.psc.table; import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; -import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.config.PscConfiguration; -import com.pinterest.psc.metadata.TopicUriMetadata; -import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -93,8 +85,6 @@ public class UpsertPscDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { - private static final Logger LOG = LoggerFactory.getLogger(UpsertPscDynamicTableFactory.class); - public static final String IDENTIFIER = "upsert-psc"; @Override @@ -162,7 +152,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); // Determine if rescale should be applied based on parallelism vs partition count - final boolean shouldRescale = shouldApplyRescale( + final boolean shouldRescale = PscRescaleUtil.shouldApplyRescale( tableOptions, context.getConfiguration(), getSourceTopicUris(tableOptions), @@ -448,125 +438,4 @@ public int hashCode() { } } - // -------------------------------------------------------------------------------------------- - // Helper methods for smart rescale logic - // -------------------------------------------------------------------------------------------- - - /** - * Determines whether rescale() should be applied based on: - * 1. scan.enable-rescale flag must be true - * 2. Global parallelism (table.exec.resource.default-parallelism) > partition count - * - * This automatically avoids unnecessary shuffle overhead when partitions >= parallelism. - */ - private boolean shouldApplyRescale( - ReadableConfig tableOptions, - ReadableConfig globalConfig, - List topicUris, - Properties pscProperties) { - - // First check if rescale is enabled by user - if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { - return false; - } - - // Get the global default parallelism - int defaultParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); - - // If parallelism is -1 (not set) or <= 0, cannot determine, so don't apply rescale - if (defaultParallelism <= 0) { - LOG.info("scan.enable-rescale is true, but table.exec.resource.default-parallelism is {} (not set). " + - "Rescale will not be applied. Set a positive parallelism value to enable automatic rescale.", - defaultParallelism); - return false; - } - - // Query partition count from PSC metadata - int partitionCount = getTopicPartitionCount(topicUris, pscProperties); - - // If partition count couldn't be determined, don't apply rescale (fail-safe) - if (partitionCount <= 0) { - LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " + - "Rescale will not be applied."); - return false; - } - - // Apply rescale only if user-configured parallelism exceeds partition count - boolean shouldRescale = defaultParallelism > partitionCount; - - if (shouldRescale) { - LOG.info("Applying rescale(): configured parallelism ({}) > partition count ({}). " + - "Data will be redistributed to fully utilize downstream operators.", - defaultParallelism, partitionCount); - } else { - LOG.info("Skipping rescale(): configured parallelism ({}) <= partition count ({}). " + - "No shuffle needed as source will naturally match or exceed desired parallelism.", - defaultParallelism, partitionCount); - } - - return shouldRescale; - } - - /** - * Queries the minimum partition count across all specified topic URIs. - * Returns -1 if partition count cannot be determined. - */ - private int getTopicPartitionCount(List topicUris, Properties pscProperties) { - if (topicUris == null || topicUris.isEmpty()) { - LOG.warn("No topic URIs provided for partition count query."); - return -1; - } - - PscMetadataClient metadataClient = null; - try { - // Create PSC configuration from properties - PscConfiguration pscConfig = new PscConfiguration(); - for (String key : pscProperties.stringPropertyNames()) { - pscConfig.setProperty(key, pscProperties.getProperty(key)); - } - - metadataClient = new PscMetadataClient(pscConfig); - - int minPartitionCount = Integer.MAX_VALUE; - - for (String topicUriStr : topicUris) { - try { - TopicUri topicUri = TopicUri.validate(topicUriStr); - - // Query metadata for this topic - java.util.Map metadataMap = metadataClient.describeTopicUris( - topicUri, // cluster URI (can use full topic URI) - java.util.Collections.singleton(topicUri), - java.time.Duration.ofSeconds(10)); - - TopicUriMetadata metadata = metadataMap.get(topicUri); - if (metadata != null) { - int partitionCount = metadata.getTopicUriPartitions().size(); - LOG.debug("Topic {} has {} partitions", topicUriStr, partitionCount); - minPartitionCount = Math.min(minPartitionCount, partitionCount); - } else { - LOG.warn("No metadata returned for topic {}", topicUriStr); - } - } catch (Exception e) { - LOG.warn("Failed to query partition count for topic {}: {}", - topicUriStr, e.getMessage()); - } - } - - return (minPartitionCount == Integer.MAX_VALUE) ? -1 : minPartitionCount; - - } catch (Exception e) { - LOG.warn("Failed to create PSC metadata client or query partition count: {}", - e.getMessage()); - return -1; - } finally { - if (metadataClient != null) { - try { - metadataClient.close(); - } catch (Exception e) { - LOG.warn("Failed to close PSC metadata client: {}", e.getMessage()); - } - } - } - } } From e20b397e5887c0a171097f9ae4a2f0430b90732d Mon Sep 17 00:00:00 2001 From: Kevin Browne Date: Fri, 31 Oct 2025 13:19:18 -0400 Subject: [PATCH 9/9] create TableCommonUtils --- .../psc/table/PscDynamicTableFactory.java | 3 ++- ...PscRescaleUtil.java => PscTableCommonUtils.java} | 13 +++++++------ .../psc/table/UpsertPscDynamicTableFactory.java | 3 ++- 3 files changed, 11 insertions(+), 8 deletions(-) rename psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/{PscRescaleUtil.java => PscTableCommonUtils.java} (95%) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index 885959f0..358d7e21 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -96,6 +96,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateDeliveryGuarantee; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSinkOptions; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSourceOptions; +import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale; /** * Factory for creating configured instances of {@link PscDynamicSource} and {@link @@ -215,7 +216,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); // Determine if rescale should be applied based on parallelism vs partition count - final boolean shouldRescale = PscRescaleUtil.shouldApplyRescale( + final boolean shouldRescale = shouldApplyRescale( tableOptions, context.getConfiguration(), getSourceTopicUris(tableOptions), diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscRescaleUtil.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java similarity index 95% rename from psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscRescaleUtil.java rename to psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java index 9c55dcde..24858b5d 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscRescaleUtil.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -35,14 +35,14 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; /** - * Utility class for smart rescale decision logic shared between PSC table factories. + * Common utility functions for PSC dynamic table source and sink. * - *

This class provides methods to determine whether rescale() should be applied to a PSC source - * based on comparing the configured pipeline parallelism with the actual topic partition count. + *

This class provides shared utility methods used across PSC table factories, + * including smart rescale decision logic, metadata queries, and other common operations. */ -public class PscRescaleUtil { +public class PscTableCommonUtils { - private static final Logger LOG = LoggerFactory.getLogger(PscRescaleUtil.class); + private static final Logger LOG = LoggerFactory.getLogger(PscTableCommonUtils.class); /** * Determines whether rescale() should be applied based on: @@ -175,8 +175,9 @@ private static int getTopicPartitionCount(List topicUris, Properties psc } /** Private constructor to prevent instantiation. */ - private PscRescaleUtil() { + private PscTableCommonUtils() { throw new UnsupportedOperationException("Utility class should not be instantiated"); } } + diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index bb914997..0ea9a9f4 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -80,6 +80,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateScanBoundedMode; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateConsumerClientOptions; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateProducerClientOptions; +import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale; /** Upsert-Psc factory. */ public class UpsertPscDynamicTableFactory @@ -152,7 +153,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); // Determine if rescale should be applied based on parallelism vs partition count - final boolean shouldRescale = PscRescaleUtil.shouldApplyRescale( + final boolean shouldRescale = shouldApplyRescale( tableOptions, context.getConfiguration(), getSourceTopicUris(tableOptions),