diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index 21338b0d..bd8e5abc 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -109,6 +109,18 @@ public class PscConnectorOptions { public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + public static final ConfigOption SCAN_ENABLE_RESCALE = + ConfigOptions.key("scan.enable-rescale") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable smart rescale() to redistribute data when needed. When enabled, the connector " + + "automatically compares the configured pipeline parallelism (table.exec.resource.default-parallelism) " + + "with the topic's partition count. Rescale is applied ONLY when parallelism > partition count, " + + "avoiding unnecessary shuffle overhead otherwise. This allows downstream operators to fully utilize " + + "higher parallelism than the source partition count. " + + "Default: false (no automatic shuffling)."); + // -------------------------------------------------------------------------------------------- // Psc specific options // -------------------------------------------------------------------------------------------- diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index 2ca65613..bad78fa6 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -169,6 +169,9 @@ public class PscDynamicSource /** Optional user-provided UID prefix for stabilizing operator UIDs across DAG changes. */ protected final @Nullable String sourceUidPrefix; + /** Enable rescale() shuffle to redistribute data across downstream operators. */ + protected final boolean enableRescale; + public PscDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -187,7 +190,8 @@ public PscDynamicSource( long boundedTimestampMillis, boolean upsertMode, String tableIdentifier, - @Nullable String sourceUidPrefix) { + @Nullable String sourceUidPrefix, + boolean enableRescale) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -228,11 +232,12 @@ public PscDynamicSource( this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; this.sourceUidPrefix = sourceUidPrefix; + this.enableRescale = enableRescale; } /** - * Backward-compatible constructor without UID prefix. Delegates to the full constructor with a - * null {@code sourceUidPrefix}. + * Backward-compatible constructor without UID prefix and rescale. Delegates to the full + * constructor with null {@code sourceUidPrefix} and false {@code enableRescale}. */ public PscDynamicSource( DataType physicalDataType, @@ -270,7 +275,8 @@ public PscDynamicSource( boundedTimestampMillis, upsertMode, tableIdentifier, - null); + null, + false); } @Override @@ -302,16 +308,24 @@ public DataStream produceDataStream( DataStreamSource sourceStream = execEnv.fromSource( pscSource, watermarkStrategy, "PscSource-" + tableIdentifier); + + // Let Flink handle source parallelism automatically (defaults to partition count) + // Only add rescale if explicitly enabled to redistribute data across downstream operators + DataStream resultStream = sourceStream; + if (enableRescale) { + resultStream = sourceStream.rescale(); + } + // Prefer explicit user-provided UID prefix if present; otherwise rely on provider context. if (sourceUidPrefix != null) { final String trimmedPrefix = sourceUidPrefix.trim(); if (!trimmedPrefix.isEmpty()) { sourceStream.uid(trimmedPrefix + ":" + PSC_TRANSFORMATION + ":" + tableIdentifier); - return sourceStream; + return resultStream; } } providerContext.generateUid(PSC_TRANSFORMATION).ifPresent(sourceStream::uid); - return sourceStream; + return resultStream; } @Override @@ -396,7 +410,8 @@ public DynamicTableSource copy() { boundedTimestampMillis, upsertMode, tableIdentifier, - sourceUidPrefix); + sourceUidPrefix, + enableRescale); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -436,6 +451,8 @@ public boolean equals(Object o) { && boundedTimestampMillis == that.boundedTimestampMillis && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) + && Objects.equals(sourceUidPrefix, that.sourceUidPrefix) + && enableRescale == that.enableRescale && Objects.equals(watermarkStrategy, that.watermarkStrategy); } @@ -461,6 +478,8 @@ public int hashCode() { boundedTimestampMillis, upsertMode, tableIdentifier, + sourceUidPrefix, + enableRescale, watermarkStrategy); } diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index 194cd629..358d7e21 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -75,6 +75,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI; @@ -95,6 +96,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateDeliveryGuarantee; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSinkOptions; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSourceOptions; +import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale; /** * Factory for creating configured instances of {@link PscDynamicSource} and {@link @@ -149,6 +151,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(SCAN_ENABLE_RESCALE); options.add(SOURCE_UID_PREFIX); return options; } @@ -212,6 +215,13 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + // Determine if rescale should be applied based on parallelism vs partition count + final boolean shouldRescale = shouldApplyRescale( + tableOptions, + context.getConfiguration(), + getSourceTopicUris(tableOptions), + properties); + return createPscTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -229,7 +239,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString(), - tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null)); + tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), + shouldRescale); } @Override @@ -395,7 +406,8 @@ protected PscDynamicSource createPscTableSource( Map specificEndOffsets, long endTimestampMillis, String tableIdentifier, - @Nullable String sourceUidPrefix) { + @Nullable String sourceUidPrefix, + boolean enableRescale) { return new PscDynamicSource( physicalDataType, keyDecodingFormat, @@ -414,7 +426,8 @@ protected PscDynamicSource createPscTableSource( endTimestampMillis, false, tableIdentifier, - sourceUidPrefix); + sourceUidPrefix, + enableRescale); } protected PscDynamicSink creatPscTableSink( diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java new file mode 100644 index 00000000..24858b5d --- /dev/null +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.pinterest.flink.streaming.connectors.psc.table; + +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.metadata.TopicUriMetadata; +import com.pinterest.psc.metadata.client.PscMetadataClient; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; + +/** + * Common utility functions for PSC dynamic table source and sink. + * + *

This class provides shared utility methods used across PSC table factories, + * including smart rescale decision logic, metadata queries, and other common operations. + */ +public class PscTableCommonUtils { + + private static final Logger LOG = LoggerFactory.getLogger(PscTableCommonUtils.class); + + /** + * Determines whether rescale() should be applied based on: + * 1. scan.enable-rescale flag must be true + * 2. Global parallelism (table.exec.resource.default-parallelism) > partition count + * + *

This automatically avoids unnecessary shuffle overhead when partitions >= parallelism. + * + * @param tableOptions User's table configuration options + * @param globalConfig Global Flink configuration + * @param topicUris List of topic URIs to query for partition counts + * @param pscProperties PSC properties for metadata client connection + * @return true if rescale should be applied, false otherwise + */ + public static boolean shouldApplyRescale( + ReadableConfig tableOptions, + ReadableConfig globalConfig, + List topicUris, + Properties pscProperties) { + + // First check if rescale is enabled by user + if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { + return false; + } + + // Get the global default parallelism + int defaultParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + + // If parallelism is -1 (not set) or <= 0, cannot determine, so don't apply rescale + if (defaultParallelism <= 0) { + LOG.info("scan.enable-rescale is true, but table.exec.resource.default-parallelism is {} (not set). " + + "Rescale will not be applied. Set a positive parallelism value to enable automatic rescale.", + defaultParallelism); + return false; + } + + // Query partition count from PSC metadata + int partitionCount = getTopicPartitionCount(topicUris, pscProperties); + + // If partition count couldn't be determined, don't apply rescale (fail-safe) + if (partitionCount <= 0) { + LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " + + "Rescale will not be applied."); + return false; + } + + // Apply rescale only if user-configured parallelism exceeds partition count + boolean shouldRescale = defaultParallelism > partitionCount; + + if (shouldRescale) { + LOG.info("Applying rescale(): configured parallelism ({}) > partition count ({}). " + + "Data will be redistributed to fully utilize downstream operators.", + defaultParallelism, partitionCount); + } else { + LOG.info("Skipping rescale(): configured parallelism ({}) <= partition count ({}). " + + "No shuffle needed as source will naturally match or exceed desired parallelism.", + defaultParallelism, partitionCount); + } + + return shouldRescale; + } + + /** + * Queries the minimum partition count across all specified topic URIs. + * + *

For multi-topic sources, returns the minimum partition count as a conservative approach. + * If any topic has fewer partitions, that becomes the bottleneck. + * + * @param topicUris List of topic URIs to query + * @param pscProperties PSC properties for metadata client connection + * @return Minimum partition count across all topics, or -1 if count cannot be determined + */ + private static int getTopicPartitionCount(List topicUris, Properties pscProperties) { + if (topicUris == null || topicUris.isEmpty()) { + LOG.warn("No topic URIs provided for partition count query."); + return -1; + } + + PscMetadataClient metadataClient = null; + try { + // Create PSC configuration from properties + PscConfiguration pscConfig = new PscConfiguration(); + for (String key : pscProperties.stringPropertyNames()) { + pscConfig.setProperty(key, pscProperties.getProperty(key)); + } + + metadataClient = new PscMetadataClient(pscConfig); + + int minPartitionCount = Integer.MAX_VALUE; + + for (String topicUriStr : topicUris) { + try { + TopicUri topicUri = TopicUri.validate(topicUriStr); + + // Query metadata for this topic + Map metadataMap = metadataClient.describeTopicUris( + topicUri, // cluster URI (can use full topic URI) + java.util.Collections.singleton(topicUri), + Duration.ofSeconds(10)); + + TopicUriMetadata metadata = metadataMap.get(topicUri); + if (metadata != null) { + int partitionCount = metadata.getTopicUriPartitions().size(); + LOG.debug("Topic {} has {} partitions", topicUriStr, partitionCount); + minPartitionCount = Math.min(minPartitionCount, partitionCount); + } else { + LOG.warn("No metadata returned for topic {}", topicUriStr); + } + } catch (Exception e) { + LOG.warn("Failed to query partition count for topic {}: {}", + topicUriStr, e.getMessage()); + } + } + + return (minPartitionCount == Integer.MAX_VALUE) ? -1 : minPartitionCount; + + } catch (Exception e) { + LOG.warn("Failed to create PSC metadata client or query partition count: {}", + e.getMessage()); + return -1; + } finally { + if (metadataClient != null) { + try { + metadataClient.close(); + } catch (Exception e) { + LOG.warn("Failed to close PSC metadata client: {}", e.getMessage()); + } + } + } + } + + /** Private constructor to prevent instantiation. */ + private PscTableCommonUtils() { + throw new UnsupportedOperationException("Utility class should not be instantiated"); + } +} + + diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index 3e599897..0ea9a9f4 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; @@ -61,6 +60,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_MODE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_ENABLE_RESCALE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; @@ -80,6 +80,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateScanBoundedMode; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateConsumerClientOptions; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateProducerClientOptions; +import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale; /** Upsert-Psc factory. */ public class UpsertPscDynamicTableFactory @@ -112,6 +113,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(SCAN_ENABLE_RESCALE); options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); options.add(SOURCE_UID_PREFIX); @@ -150,6 +152,13 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + // Determine if rescale should be applied based on parallelism vs partition count + final boolean shouldRescale = shouldApplyRescale( + tableOptions, + context.getConfiguration(), + getSourceTopicUris(tableOptions), + properties); + return new PscDynamicSource( context.getPhysicalRowDataType(), keyDecodingFormat, @@ -168,7 +177,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedTimestampMillis, true, context.getObjectIdentifier().asSummaryString(), - tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null)); + tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), + shouldRescale); } @Override @@ -428,4 +438,5 @@ public int hashCode() { return Objects.hash(innerEncodingFormat); } } + } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index 3d253b6e..13b5a6ea 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -1246,6 +1246,39 @@ public void testTableSinkWithParallelism() { assertThat((long) sinkProvider.getParallelism().get()).isEqualTo(100); } + @Test + public void testTableSourceWithRescaleDisabled() { + // When scan.enable-rescale is false (default), rescale should not be applied + final Map modifiedOptions = getBasicSourceOptions(); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isFalse(); + } + + @Test + public void testTableSourceWithRescaleEnabled() { + // When scan.enable-rescale is true but parallelism is not configured, + // rescale should not be applied (fail-safe behavior) + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> options.put("scan.enable-rescale", "true")); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + // Note: The actual rescale decision is made in the factory based on + // partition count vs parallelism comparison. This test just verifies + // that the flag can be enabled without errors. + // In real scenarios, rescale will only be applied if: + // 1. scan.enable-rescale = true + // 2. table.exec.resource.default-parallelism > partition count + // 3. Metadata query succeeds + } + @Test public void testTableSinkAutoCompleteSchemaRegistrySubject() { // only format @@ -1729,7 +1762,9 @@ private static PscDynamicSource createExpectedScanSource( Collections.emptyMap(), 0, false, - FactoryMocks.IDENTIFIER.asSummaryString()); + FactoryMocks.IDENTIFIER.asSummaryString(), + null, + false); } private static PscDynamicSink createExpectedSink(