Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 13 additions & 110 deletions api/src/main/java/org/apache/iceberg/transforms/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
package org.apache.iceberg.transforms;

import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.function.Function;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundTransform;
Expand All @@ -32,8 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BucketUtil;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.BucketHash;
import org.apache.iceberg.util.SerializableFunction;

class Bucket<T> implements Transform<T, Integer>, Serializable {
Expand All @@ -55,28 +51,7 @@ static <T, B extends Bucket<T> & SerializableFunction<T, Integer>> B get(
Preconditions.checkArgument(
numBuckets > 0, "Invalid number of buckets: %s (must be > 0)", numBuckets);

switch (type.typeId()) {
case DATE:
case INTEGER:
return (B) new BucketInteger(numBuckets);
case TIME:
case TIMESTAMP:
case LONG:
return (B) new BucketLong(numBuckets);
case DECIMAL:
return (B) new BucketDecimal(numBuckets);
case STRING:
return (B) new BucketString(numBuckets);
case FIXED:
case BINARY:
return (B) new BucketByteBuffer(numBuckets);
case TIMESTAMP_NANO:
return (B) new BucketTimestampNano(numBuckets);
case UUID:
return (B) new BucketUUID(numBuckets);
default:
throw new IllegalArgumentException("Cannot bucket by type: " + type);
}
return (B) new BucketFunction<>(numBuckets, BucketHash.forType(type));
}

private final int numBuckets;
Expand Down Expand Up @@ -131,6 +106,10 @@ public boolean canTransform(Type type) {
case DECIMAL:
case UUID:
return true;
case STRUCT:
return type.asStructType().fields().stream()
.map(Types.NestedField::type)
.allMatch(this::canTransform);
}
return false;
}
Expand Down Expand Up @@ -206,95 +185,19 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}

private static class BucketInteger extends Bucket<Integer>
implements SerializableFunction<Integer, Integer> {

private BucketInteger(int numBuckets) {
super(numBuckets);
}

@Override
protected int hash(Integer value) {
return BucketUtil.hash(value);
}
}

private static class BucketLong extends Bucket<Long>
implements SerializableFunction<Long, Integer> {

private BucketLong(int numBuckets) {
super(numBuckets);
}

@Override
protected int hash(Long value) {
return BucketUtil.hash(value);
}
}

// In order to bucket TimestampNano the same as Timestamp, convert to micros before hashing.
private static class BucketTimestampNano extends Bucket<Long>
implements SerializableFunction<Long, Integer> {

private BucketTimestampNano(int numBuckets) {
super(numBuckets);
}

@Override
protected int hash(Long nanos) {
return BucketUtil.hash(DateTimeUtil.nanosToMicros(nanos));
}
}

private static class BucketString extends Bucket<CharSequence>
implements SerializableFunction<CharSequence, Integer> {

private BucketString(int numBuckets) {
super(numBuckets);
}

@Override
protected int hash(CharSequence value) {
return BucketUtil.hash(value);
}
}

private static class BucketByteBuffer extends Bucket<ByteBuffer>
implements SerializableFunction<ByteBuffer, Integer> {

private BucketByteBuffer(int numBuckets) {
super(numBuckets);
}

@Override
protected int hash(ByteBuffer value) {
return BucketUtil.hash(value);
}
}

private static class BucketUUID extends Bucket<UUID>
implements SerializableFunction<UUID, Integer> {

private BucketUUID(int numBuckets) {
super(numBuckets);
}

@Override
public int hash(UUID value) {
return BucketUtil.hash(value);
}
}
private static class BucketFunction<T> extends Bucket<T>
implements SerializableFunction<T, Integer> {

private static class BucketDecimal extends Bucket<BigDecimal>
implements SerializableFunction<BigDecimal, Integer> {
private final BucketHash<T> bucketHash;

private BucketDecimal(int numBuckets) {
private BucketFunction(int numBuckets, BucketHash<T> bucketHash) {
super(numBuckets);
this.bucketHash = bucketHash;
}

@Override
protected int hash(BigDecimal value) {
return BucketUtil.hash(value);
protected int hash(T value) {
return bucketHash.hash(value);
}
}
}
58 changes: 58 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/BucketHash.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.iceberg.types.Type;

@FunctionalInterface
public interface BucketHash<T> extends Serializable {
int hash(T value);

@SuppressWarnings("unchecked")
static <T> BucketHash<T> forType(Type type) {
switch (type.typeId()) {
case DATE:
case INTEGER:
return value -> BucketUtil.hash((int) value);
case TIME:
case TIMESTAMP:
case LONG:
return value -> BucketUtil.hash((long) value);
case DECIMAL:
return value -> BucketUtil.hash((BigDecimal) value);
case STRING:
return value -> BucketUtil.hash((CharSequence) value);
case FIXED:
case BINARY:
return value -> BucketUtil.hash((ByteBuffer) value);
case TIMESTAMP_NANO:
return nanos -> BucketUtil.hash(DateTimeUtil.nanosToMicros((long) nanos));
case UUID:
return value -> BucketUtil.hash((UUID) value);
case STRUCT:
return (BucketHash<T>) BucketHashes.struct(type.asStructType());
default:
throw new IllegalArgumentException("Cannot bucket by type: " + type);
}
}
}
59 changes: 59 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/BucketHashes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.util.function.IntFunction;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.hash.Hasher;
import org.apache.iceberg.types.Types;

public class BucketHashes {
private BucketHashes() {}

static BucketHash<StructLike> struct(Types.StructType struct) {
return new StructHash(struct);
}

private static class StructHash implements BucketHash<StructLike> {
private final BucketHash<Object>[] hashes;

private StructHash(Types.StructType struct) {
this.hashes =
struct.fields().stream()
.map(Types.NestedField::type)
.map(BucketHash::forType)
.toArray((IntFunction<BucketHash<Object>[]>) BucketHash[]::new);
}

@Override
public int hash(StructLike value) {
if (value == null) {
return 0;
}

Hasher hasher = BucketUtil.hashFunction().newHasher();
int len = hashes.length;
hasher.putInt(len);
for (int i = 0; i < len; i += 1) {
hasher.putInt(hashes[i].hash(value.get(i, Object.class)));
}
return hasher.hash().asInt();
}
}
}
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/BucketUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,8 @@ public static int hash(UUID value) {
public static int hash(BigDecimal value) {
return MURMUR3.hashBytes(value.unscaledValue().toByteArray()).asInt();
}

public static HashFunction hashFunction() {
return MURMUR3;
}
}
26 changes: 26 additions & 0 deletions api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import java.util.Random;
import java.util.UUID;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
import org.apache.iceberg.relocated.com.google.common.hash.Hasher;
import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BucketUtil;
import org.apache.iceberg.util.SerializableFunction;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -410,6 +413,29 @@ public void testUUIDHash() {
.isEqualTo(hashBytes(uuidBytes));
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Test
void testStruct() {
Bucket<?> bucket = Bucket.get(Integer.MAX_VALUE);
Types.StructType structType =
Types.StructType.of(
Types.NestedField.optional(1, "intCol", Types.IntegerType.get()),
Types.NestedField.optional(2, "strCol", Types.StringType.get()));
assertThat(bucket.canTransform(structType)).isTrue();
SerializableFunction boundBucket = bucket.bind(structType);
TestHelpers.CustomRow row = TestHelpers.CustomRow.of(1, "aaa");

Hasher hasher = MURMUR3.newHasher();
hasher.putInt(structType.fields().size());
hasher.putInt(BucketUtil.hash(row.get(0, Integer.class)));
hasher.putInt(BucketUtil.hash(row.get(1, String.class)));
int expected = hasher.hash().asInt();

assertThat(boundBucket.apply(row))
.as("Struct hash should match hash of struct fields")
.isEqualTo(expected);
}

@Test
public void testVerifiedIllegalNumBuckets() {
assertThatThrownBy(() -> Bucket.get(0))
Expand Down