diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOperation.java b/src/main/java/parallelai/spyglass/hbase/HBaseOperation.java new file mode 100644 index 0000000..6f13596 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOperation.java @@ -0,0 +1,61 @@ +package parallelai.spyglass.hbase; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + +public abstract class HBaseOperation { + public enum OperationType { + PUT_COLUMN, DELETE_COLUMN, DELETE_FAMILY, DELETE_ROW, NO_OP + } + + public static class PutColumn extends HBaseOperation { + private final ImmutableBytesWritable value; + + public PutColumn(final ImmutableBytesWritable value) { + super(OperationType.PUT_COLUMN); + this.value = value; + } + + public byte[] getBytes() { + return value.get(); + } + } + + public static class DeleteColumn extends HBaseOperation { + private DeleteColumn() { + super(OperationType.DELETE_COLUMN); + } + } + + public static class DeleteFamily extends HBaseOperation { + private DeleteFamily() { + super(OperationType.DELETE_FAMILY); + } + } + + public static class DeleteRow extends HBaseOperation { + private DeleteRow() { + super(OperationType.DELETE_ROW); + } + } + + static class NoOp extends HBaseOperation { + private NoOp() { + super(OperationType.NO_OP); + } + } + + public static final DeleteColumn DELETE_COLUMN = new DeleteColumn(); + public static final DeleteFamily DELETE_FAMILY = new DeleteFamily(); + public static final DeleteRow DELETE_ROW = new DeleteRow(); + public static final NoOp NO_OP = new NoOp(); + + private final OperationType operationType; + + private HBaseOperation(final OperationType operationType) { + this.operationType = operationType; + } + + public OperationType getType() { + return operationType; + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 3c62f82..e907b93 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -8,7 +8,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; @@ -17,7 +17,7 @@ * Convert Map/Reduce output and write it to an HBase table */ public class HBaseOutputFormat extends -FileOutputFormat implements JobConfigurable { +FileOutputFormat implements JobConfigurable { /** JobConf parameter that specifies the output table */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; @@ -68,4 +68,4 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throw new IOException("Must specify table name"); } } -} \ No newline at end of file +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java index 50aa116..46129ed 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -1,12 +1,19 @@ package parallelai.spyglass.hbase; +import java.util.Map; +import java.util.List; +import java.util.LinkedList; import java.io.IOException; import cascading.tap.SinkMode; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; @@ -15,9 +22,12 @@ * and write to an HBase table */ public class HBaseRecordWriter - implements RecordWriter { + implements RecordWriter { private HTable m_table; private SinkMode m_sinkMode = SinkMode.UPDATE; + private long deletesBufferSize; + private List deletesBuffer = new LinkedList(); + private long currentDeletesBufferSize = 0; // in bytes /** * Instantiate a TableRecordWriter with the HBase HClient for writing. @@ -26,30 +36,96 @@ public class HBaseRecordWriter */ public HBaseRecordWriter(HTable table) { m_table = table; + deletesBufferSize = table.getConfiguration().getLong("spyglass.deletes.buffer", 524288); } public void close(Reporter reporter) throws IOException { + while (currentDeletesBufferSize > 0) { + flushDeletes(); + } m_table.close(); } public void setSinkMode(SinkMode sinkMode) { - m_sinkMode = sinkMode; + m_sinkMode = sinkMode; } public void write(ImmutableBytesWritable key, - Put value) throws IOException { + Mutation value) throws IOException { switch(m_sinkMode) { - case UPDATE: - m_table.put(new Put(value)); - break; + case UPDATE: + if (value instanceof Put) { + m_table.put(new Put((Put) value)); + } else if (value instanceof Delete) { + doDelete((Delete) value); + } else { + throw new RuntimeException("unsupported mutation"); // currently append is not supported + } + break; + + case REPLACE: + doDelete(new Delete(value.getRow())); + break; + + default: + throw new IOException("Unknown Sink Mode : " + m_sinkMode); + } + } + + private void doDelete(Delete delete) throws IOException { + currentDeletesBufferSize += heapSizeOfDelete(delete); // currentDeletesBufferSize += delete.heapSize(); + deletesBuffer.add(new Delete((Delete) delete)); + while (currentDeletesBufferSize > deletesBufferSize) { + flushDeletes(); + } + } + + private void flushDeletes() throws IOException { + try { + m_table.delete(deletesBuffer); // successfull deletes are removed from deletesBuffer + } finally { + currentDeletesBufferSize = 0; + for (Delete delete: deletesBuffer) { + currentDeletesBufferSize += heapSizeOfDelete(delete); // currentDeletesBufferSize += delete.heapSize(); + } + } + } + + // this all goes away in newer hbase version where delete has a heapSize + private static long heapSizeOfDelete(Delete delete) { + long heapsize = ClassSize.align( + // This + ClassSize.OBJECT + + // row + OperationWithAttributes.attributes + 2 * ClassSize.REFERENCE + + // Timestamp + 1 * Bytes.SIZEOF_LONG + + // durability + ClassSize.REFERENCE + + // familyMap + ClassSize.REFERENCE + + // familyMap + ClassSize.TREEMAP); + + // Adding row + heapsize += ClassSize.align(ClassSize.ARRAY + delete.getRow().length); - case REPLACE: - m_table.delete(new Delete(value.getRow())); - break; + heapsize += ClassSize.align(delete.getFamilyMap().size() * ClassSize.MAP_ENTRY); + for(Map.Entry> entry : delete.getFamilyMap().entrySet()) { + //Adding key overhead + heapsize += ClassSize.align(ClassSize.ARRAY + entry.getKey().length); - default: - throw new IOException("Unknown Sink Mode : " + m_sinkMode); + //This part is kinds tricky since the JVM can reuse references if you + //store the same value, but have a good match with SizeOf at the moment + //Adding value overhead + heapsize += ClassSize.align(ClassSize.ARRAYLIST); + int size = entry.getValue().size(); + heapsize += ClassSize.align(ClassSize.ARRAY + size * ClassSize.REFERENCE); + for(KeyValue kv : entry.getValue()) { + heapsize += kv.heapSize(); + } } + return heapsize; } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index 6f04f01..7045696 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -22,8 +22,10 @@ import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.util.Util; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; @@ -249,31 +251,68 @@ public void sink(FlowProcess flowProcess, SinkCall 0) { + outputCollector.collect(null, put); + } + if (delete.size() > 0) { + outputCollector.collect(null, delete); + } + if (deleteRow == true) { + if (put.size() > 0 || delete.size() > 0) { + throw new RuntimeException("can not combine row delete with any other operations on row"); + } + outputCollector.collect(null, delete); + } } - + @Override public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setOutputFormat(HBaseOutputFormat.class); conf.setOutputKeyClass(ImmutableBytesWritable.class); - conf.setOutputValueClass(Put.class); + conf.setOutputValueClass(Mutation.class); String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE); useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false); diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala index 31ed3ea..70df798 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -19,6 +19,19 @@ class HBasePipeWrapper (pipe: Pipe) { } } + def toHBaseOperation(f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => from match { + case "__DELETE_COLUMN__" => HBaseOperation.DELETE_COLUMN + case "__DELETE_FAMILY__" => HBaseOperation.DELETE_FAMILY + case "__DELETE_ROW__" => HBaseOperation.DELETE_ROW + case null => HBaseOperation.NO_OP + case x => new HBaseOperation.PutColumn(new ImmutableBytesWritable(Bytes.toBytes(x))) + }} + }} + } + // def toBytesWritable : Pipe = { // asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { // p.map(f.toString -> f.toString){ from: String => {