Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/main/java/parallelai/spyglass/hbase/HBaseOperation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package parallelai.spyglass.hbase;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

public abstract class HBaseOperation {
public enum OperationType {
PUT_COLUMN, DELETE_COLUMN, DELETE_FAMILY, DELETE_ROW, NO_OP
}

public static class PutColumn extends HBaseOperation {
private final ImmutableBytesWritable value;

public PutColumn(final ImmutableBytesWritable value) {
super(OperationType.PUT_COLUMN);
this.value = value;
}

public byte[] getBytes() {
return value.get();
}
}

public static class DeleteColumn extends HBaseOperation {
private DeleteColumn() {
super(OperationType.DELETE_COLUMN);
}
}

public static class DeleteFamily extends HBaseOperation {
private DeleteFamily() {
super(OperationType.DELETE_FAMILY);
}
}

public static class DeleteRow extends HBaseOperation {
private DeleteRow() {
super(OperationType.DELETE_ROW);
}
}

static class NoOp extends HBaseOperation {
private NoOp() {
super(OperationType.NO_OP);
}
}

public static final DeleteColumn DELETE_COLUMN = new DeleteColumn();
public static final DeleteFamily DELETE_FAMILY = new DeleteFamily();
public static final DeleteRow DELETE_ROW = new DeleteRow();
public static final NoOp NO_OP = new NoOp();

private final OperationType operationType;

private HBaseOperation(final OperationType operationType) {
this.operationType = operationType;
}

public OperationType getType() {
return operationType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Progressable;
Expand All @@ -17,7 +17,7 @@
* Convert Map/Reduce output and write it to an HBase table
*/
public class HBaseOutputFormat extends
FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
FileOutputFormat<ImmutableBytesWritable, Mutation> implements JobConfigurable {

/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
Expand Down Expand Up @@ -68,4 +68,4 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job)
throw new IOException("Must specify table name");
}
}
}
}
98 changes: 87 additions & 11 deletions src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package parallelai.spyglass.hbase;

import java.util.Map;
import java.util.List;
import java.util.LinkedList;
import java.io.IOException;

import cascading.tap.SinkMode;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;

Expand All @@ -15,9 +22,12 @@
* and write to an HBase table
*/
public class HBaseRecordWriter
implements RecordWriter<ImmutableBytesWritable, Put> {
implements RecordWriter<ImmutableBytesWritable, Mutation> {
private HTable m_table;
private SinkMode m_sinkMode = SinkMode.UPDATE;
private long deletesBufferSize;
private List<Delete> deletesBuffer = new LinkedList<Delete>();
private long currentDeletesBufferSize = 0; // in bytes

/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
Expand All @@ -26,30 +36,96 @@ public class HBaseRecordWriter
*/
public HBaseRecordWriter(HTable table) {
m_table = table;
deletesBufferSize = table.getConfiguration().getLong("spyglass.deletes.buffer", 524288);
}

public void close(Reporter reporter)
throws IOException {
while (currentDeletesBufferSize > 0) {
flushDeletes();
}
m_table.close();
}

public void setSinkMode(SinkMode sinkMode) {
m_sinkMode = sinkMode;
m_sinkMode = sinkMode;
}

public void write(ImmutableBytesWritable key,
Put value) throws IOException {
Mutation value) throws IOException {
switch(m_sinkMode) {
case UPDATE:
m_table.put(new Put(value));
break;
case UPDATE:
if (value instanceof Put) {
m_table.put(new Put((Put) value));
} else if (value instanceof Delete) {
doDelete((Delete) value);
} else {
throw new RuntimeException("unsupported mutation"); // currently append is not supported
}
break;

case REPLACE:
doDelete(new Delete(value.getRow()));
break;

default:
throw new IOException("Unknown Sink Mode : " + m_sinkMode);
}
}

private void doDelete(Delete delete) throws IOException {
currentDeletesBufferSize += heapSizeOfDelete(delete); // currentDeletesBufferSize += delete.heapSize();
deletesBuffer.add(new Delete((Delete) delete));
while (currentDeletesBufferSize > deletesBufferSize) {
flushDeletes();
}
}

private void flushDeletes() throws IOException {
try {
m_table.delete(deletesBuffer); // successfull deletes are removed from deletesBuffer
} finally {
currentDeletesBufferSize = 0;
for (Delete delete: deletesBuffer) {
currentDeletesBufferSize += heapSizeOfDelete(delete); // currentDeletesBufferSize += delete.heapSize();
}
}
}

// this all goes away in newer hbase version where delete has a heapSize
private static long heapSizeOfDelete(Delete delete) {
long heapsize = ClassSize.align(
// This
ClassSize.OBJECT +
// row + OperationWithAttributes.attributes
2 * ClassSize.REFERENCE +
// Timestamp
1 * Bytes.SIZEOF_LONG +
// durability
ClassSize.REFERENCE +
// familyMap
ClassSize.REFERENCE +
// familyMap
ClassSize.TREEMAP);

// Adding row
heapsize += ClassSize.align(ClassSize.ARRAY + delete.getRow().length);

case REPLACE:
m_table.delete(new Delete(value.getRow()));
break;
heapsize += ClassSize.align(delete.getFamilyMap().size() * ClassSize.MAP_ENTRY);
for(Map.Entry<byte [], List<KeyValue>> entry : delete.getFamilyMap().entrySet()) {
//Adding key overhead
heapsize += ClassSize.align(ClassSize.ARRAY + entry.getKey().length);

default:
throw new IOException("Unknown Sink Mode : " + m_sinkMode);
//This part is kinds tricky since the JVM can reuse references if you
//store the same value, but have a good match with SizeOf at the moment
//Adding value overhead
heapsize += ClassSize.align(ClassSize.ARRAYLIST);
int size = entry.getValue().size();
heapsize += ClassSize.align(ClassSize.ARRAY + size * ClassSize.REFERENCE);
for(KeyValue kv : entry.getValue()) {
heapsize += kv.heapSize();
}
}
return heapsize;
}
}
59 changes: 49 additions & 10 deletions src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -249,31 +251,68 @@ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputColl
} else {
put = new Put(keyBytes.get(), this.timeStamp);
}

Delete delete;
if (this.timeStamp == 0L) {
delete = new Delete(keyBytes.get());
} else {
delete = new Delete(keyBytes.get(), this.timeStamp);
}
boolean deleteRow = false;

for (int i = 0; i < valueFields.length; i++) {
Fields fieldSelector = valueFields[i];
TupleEntry values = tupleEntry.selectEntry(fieldSelector);
Fields fields = values.getFields();
Tuple tuple = values.getTuple();

for (int j = 0; j < values.getFields().size(); j++) {
Fields fields = values.getFields();
Tuple tuple = values.getTuple();

ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j);
if (valueBytes != null)
put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
Object item = tuple.getObject(j);
if (item instanceof ImmutableBytesWritable) {
put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), ((ImmutableBytesWritable) item).get());
} else if (item != null) {
HBaseOperation op = (HBaseOperation) item;
switch(op.getType()) {
case PUT_COLUMN:
put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), ((HBaseOperation.PutColumn) item).getBytes());
break;
case DELETE_COLUMN:
delete.deleteColumns(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)));
break;
case DELETE_FAMILY:
delete.deleteFamily(Bytes.toBytes(familyNames[i]));
break;
case DELETE_ROW:
deleteRow = true;
break;
case NO_OP:
break;
}
}
}
}

outputCollector.collect(null, put);

if (put.size() > 0) {
outputCollector.collect(null, put);
}
if (delete.size() > 0) {
outputCollector.collect(null, delete);
}
if (deleteRow == true) {
if (put.size() > 0 || delete.size() > 0) {
throw new RuntimeException("can not combine row delete with any other operations on row");
}
outputCollector.collect(null, delete);
}
}

@Override
public void sinkConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputFormat(HBaseOutputFormat.class);

conf.setOutputKeyClass(ImmutableBytesWritable.class);
conf.setOutputValueClass(Put.class);
conf.setOutputValueClass(Mutation.class);

String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE);
useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false);
Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ class HBasePipeWrapper (pipe: Pipe) {
}
}

def toHBaseOperation(f: Fields): Pipe = {
asList(f)
.foldLeft(pipe){ (p, f) => {
p.map(f.toString -> f.toString){ from: String => from match {
case "__DELETE_COLUMN__" => HBaseOperation.DELETE_COLUMN
case "__DELETE_FAMILY__" => HBaseOperation.DELETE_FAMILY
case "__DELETE_ROW__" => HBaseOperation.DELETE_ROW
case null => HBaseOperation.NO_OP
case x => new HBaseOperation.PutColumn(new ImmutableBytesWritable(Bytes.toBytes(x)))
}}
}}
}

// def toBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
// p.map(f.toString -> f.toString){ from: String => {
Expand Down