From 121771e5096ca94352a1b7c519b3ce6a87cba19a Mon Sep 17 00:00:00 2001 From: linyanghao Date: Mon, 30 Jun 2025 21:24:53 +0800 Subject: [PATCH 1/2] bucket by struct --- .../org/apache/iceberg/transforms/Bucket.java | 123 ++---------------- .../org/apache/iceberg/util/BucketHash.java | 58 +++++++++ .../org/apache/iceberg/util/BucketHashes.java | 59 +++++++++ .../org/apache/iceberg/util/BucketUtil.java | 4 + .../iceberg/transforms/TestBucketing.java | 26 ++++ 5 files changed, 160 insertions(+), 110 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/util/BucketHash.java create mode 100644 api/src/main/java/org/apache/iceberg/util/BucketHashes.java diff --git a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java index 2b2439e3ed0a..a3867899b97d 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java @@ -19,9 +19,6 @@ package org.apache.iceberg.transforms; import java.io.Serializable; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.UUID; import java.util.function.Function; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundTransform; @@ -32,8 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.BucketUtil; -import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.BucketHash; import org.apache.iceberg.util.SerializableFunction; class Bucket implements Transform, Serializable { @@ -55,28 +51,7 @@ static & SerializableFunction> B get( Preconditions.checkArgument( numBuckets > 0, "Invalid number of buckets: %s (must be > 0)", numBuckets); - switch (type.typeId()) { - case DATE: - case INTEGER: - return (B) new BucketInteger(numBuckets); - case TIME: - case TIMESTAMP: - case LONG: - return (B) new BucketLong(numBuckets); - case DECIMAL: - return (B) new BucketDecimal(numBuckets); - case STRING: - return (B) new BucketString(numBuckets); - case FIXED: - case BINARY: - return (B) new BucketByteBuffer(numBuckets); - case TIMESTAMP_NANO: - return (B) new BucketTimestampNano(numBuckets); - case UUID: - return (B) new BucketUUID(numBuckets); - default: - throw new IllegalArgumentException("Cannot bucket by type: " + type); - } + return (B) new BucketFunction<>(numBuckets, BucketHash.forType(type)); } private final int numBuckets; @@ -131,6 +106,10 @@ public boolean canTransform(Type type) { case DECIMAL: case UUID: return true; + case STRUCT: + return type.asStructType().fields().stream() + .map(Types.NestedField::type) + .allMatch(this::canTransform); } return false; } @@ -206,95 +185,19 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } - private static class BucketInteger extends Bucket - implements SerializableFunction { - - private BucketInteger(int numBuckets) { - super(numBuckets); - } - - @Override - protected int hash(Integer value) { - return BucketUtil.hash(value); - } - } - - private static class BucketLong extends Bucket - implements SerializableFunction { - - private BucketLong(int numBuckets) { - super(numBuckets); - } - - @Override - protected int hash(Long value) { - return BucketUtil.hash(value); - } - } - - // In order to bucket TimestampNano the same as Timestamp, convert to micros before hashing. - private static class BucketTimestampNano extends Bucket - implements SerializableFunction { - - private BucketTimestampNano(int numBuckets) { - super(numBuckets); - } - - @Override - protected int hash(Long nanos) { - return BucketUtil.hash(DateTimeUtil.nanosToMicros(nanos)); - } - } - - private static class BucketString extends Bucket - implements SerializableFunction { - - private BucketString(int numBuckets) { - super(numBuckets); - } - - @Override - protected int hash(CharSequence value) { - return BucketUtil.hash(value); - } - } - - private static class BucketByteBuffer extends Bucket - implements SerializableFunction { - - private BucketByteBuffer(int numBuckets) { - super(numBuckets); - } - - @Override - protected int hash(ByteBuffer value) { - return BucketUtil.hash(value); - } - } - - private static class BucketUUID extends Bucket - implements SerializableFunction { - - private BucketUUID(int numBuckets) { - super(numBuckets); - } - - @Override - public int hash(UUID value) { - return BucketUtil.hash(value); - } - } + private static class BucketFunction extends Bucket + implements SerializableFunction { - private static class BucketDecimal extends Bucket - implements SerializableFunction { + private final BucketHash bucketHash; - private BucketDecimal(int numBuckets) { + private BucketFunction(int numBuckets, BucketHash bucketHash) { super(numBuckets); + this.bucketHash = bucketHash; } @Override - protected int hash(BigDecimal value) { - return BucketUtil.hash(value); + protected int hash(T value) { + return bucketHash.hash(value); } } } diff --git a/api/src/main/java/org/apache/iceberg/util/BucketHash.java b/api/src/main/java/org/apache/iceberg/util/BucketHash.java new file mode 100644 index 000000000000..97bd733c6ef2 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/BucketHash.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.iceberg.types.Type; + +@FunctionalInterface +public interface BucketHash extends Serializable { + int hash(T value); + + @SuppressWarnings("unchecked") + static BucketHash forType(Type type) { + switch (type.typeId()) { + case DATE: + case INTEGER: + return value -> BucketUtil.hash((int) value); + case TIME: + case TIMESTAMP: + case LONG: + return value -> BucketUtil.hash((long) value); + case DECIMAL: + return value -> BucketUtil.hash((BigDecimal) value); + case STRING: + return value -> BucketUtil.hash((CharSequence) value); + case FIXED: + case BINARY: + return value -> BucketUtil.hash((ByteBuffer) value); + case TIMESTAMP_NANO: + return nanos -> BucketUtil.hash(DateTimeUtil.nanosToMicros((long) nanos)); + case UUID: + return value -> BucketUtil.hash((UUID) value); + case STRUCT: + return (BucketHash) BucketHashes.struct(type.asStructType()); + default: + throw new IllegalArgumentException("Cannot bucket by type: " + type); + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/util/BucketHashes.java b/api/src/main/java/org/apache/iceberg/util/BucketHashes.java new file mode 100644 index 000000000000..f0eb0b06252f --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/BucketHashes.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.function.IntFunction; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.hash.Hasher; +import org.apache.iceberg.types.Types; + +public class BucketHashes { + private BucketHashes() {} + + static BucketHash struct(Types.StructType struct) { + return new StructHash(struct); + } + + private static class StructHash implements BucketHash { + private final BucketHash[] hashes; + + private StructHash(Types.StructType struct) { + this.hashes = + struct.fields().stream() + .map(Types.NestedField::type) + .map(BucketHash::forType) + .toArray((IntFunction[]>) BucketHash[]::new); + } + + @Override + public int hash(StructLike value) { + if (value == null) { + return 0; + } + + Hasher hasher = BucketUtil.hashFunction().newHasher(); + int len = hashes.length; + hasher.putInt(len); + for (int i = 0; i < len; i += 1) { + hasher.putInt(hashes[i].hash(value.get(i, Object.class))); + } + return hasher.hash().asInt(); + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/util/BucketUtil.java b/api/src/main/java/org/apache/iceberg/util/BucketUtil.java index c6d654896b50..6f0094cfa1f9 100644 --- a/api/src/main/java/org/apache/iceberg/util/BucketUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/BucketUtil.java @@ -101,4 +101,8 @@ public static int hash(UUID value) { public static int hash(BigDecimal value) { return MURMUR3.hashBytes(value.unscaledValue().toByteArray()).asInt(); } + + public static HashFunction hashFunction() { + return MURMUR3; + } } diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java b/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java index 81f4fa6098e2..a07f71146a0e 100644 --- a/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java +++ b/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java @@ -30,11 +30,14 @@ import java.util.Random; import java.util.UUID; import org.apache.avro.util.Utf8; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.hash.HashFunction; +import org.apache.iceberg.relocated.com.google.common.hash.Hasher; import org.apache.iceberg.relocated.com.google.common.hash.Hashing; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.BucketUtil; +import org.apache.iceberg.util.SerializableFunction; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -410,6 +413,29 @@ public void testUUIDHash() { .isEqualTo(hashBytes(uuidBytes)); } + @SuppressWarnings("rawtypes") + @Test + void testStruct() { + Bucket bucket = Bucket.get(Integer.MAX_VALUE); + Types.StructType structType = + Types.StructType.of( + Types.NestedField.optional(1, "intCol", Types.IntegerType.get()), + Types.NestedField.optional(2, "strCol", Types.StringType.get())); + assertThat(bucket.canTransform(structType)).isTrue(); + SerializableFunction boundBucket = bucket.bind(structType); + TestHelpers.CustomRow row = TestHelpers.CustomRow.of(1, "aaa"); + + Hasher hasher = MURMUR3.newHasher(); + hasher.putInt(structType.fields().size()); + hasher.putInt(BucketUtil.hash(row.get(0, Integer.class))); + hasher.putInt(BucketUtil.hash(row.get(1, String.class))); + int expected = hasher.hash().asInt(); + + assertThat(boundBucket.apply(row)) + .as("Struct hash should match hash of struct fields") + .isEqualTo(expected); + } + @Test public void testVerifiedIllegalNumBuckets() { assertThatThrownBy(() -> Bucket.get(0)) From 02756db738899f8776e100b8f686ec0f70780d92 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Mon, 30 Jun 2025 21:55:21 +0800 Subject: [PATCH 2/2] suppress --- .../test/java/org/apache/iceberg/transforms/TestBucketing.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java b/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java index a07f71146a0e..b2c4c841ad4e 100644 --- a/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java +++ b/api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java @@ -413,7 +413,7 @@ public void testUUIDHash() { .isEqualTo(hashBytes(uuidBytes)); } - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes", "unchecked"}) @Test void testStruct() { Bucket bucket = Bucket.get(Integer.MAX_VALUE);