diff --git a/daal-utils/lib/daal-libs.tar.bz2 b/daal-utils/lib/daal-libs.tar.bz2 index c29e885cdd..85a2c3a272 100644 Binary files a/daal-utils/lib/daal-libs.tar.bz2 and b/daal-utils/lib/daal-libs.tar.bz2 differ diff --git a/daal-utils/src/main/c++/com_intel_daal_algorithms_ModelSerializer.c b/daal-utils/src/main/c++/com_intel_daal_algorithms_ModelSerializer.c index 54940b1291..595aa6283c 100644 --- a/daal-utils/src/main/c++/com_intel_daal_algorithms_ModelSerializer.c +++ b/daal-utils/src/main/c++/com_intel_daal_algorithms_ModelSerializer.c @@ -24,44 +24,50 @@ #include "com_intel_daal_algorithms_ModelSerializer.h" using namespace daal; +using namespace daal::services; /** JNI wrapper for serializing DAAL QR models to byte arrays */ JNIEXPORT jobject JNICALL Java_com_intel_daal_algorithms_ModelSerializer_cSerializeQrModel (JNIEnv *env, jclass thisClass, jlong cModel) { - services::SharedPtr dataTable; - services::SharedPtr *sharedPtr = (services::SharedPtr*)cModel; - + //get linear regression QR model + services::SharedPtr *sharedPtr = + (services::SharedPtr*)cModel; algorithms::linear_regression::ModelQR* qrModel = sharedPtr->get(); + + //serialize model data_management::InputDataArchive *dataArchive = new data_management::InputDataArchive(); qrModel->serialize(*dataArchive); - size_t size = dataArchive->getSizeOfArchive(); - byte* byteArray = new daal::byte[size]; - dataArchive->copyArchiveToArray(byteArray, size); - jobject directBuffer = env->NewDirectByteBuffer(byteArray, size); - jobject globalRef = env->NewGlobalRef(directBuffer); - delete dataArchive; - return globalRef; + size_t length = dataArchive->getSizeOfArchive(); + + byte* buffer = (byte*)daal_malloc(length); + dataArchive->copyArchiveToArray(buffer, length); + + return env->NewDirectByteBuffer(buffer, length); } /** JNI wrapper for deserializing byte arrays to DAAL QR models */ JNIEXPORT jlong JNICALL Java_com_intel_daal_algorithms_ModelSerializer_cDeserializeQrModel - (JNIEnv *env, jclass thisClass, jobject buffer, jlong bufferSize) { - jbyte* bufferPtr = (jbyte*)env->GetDirectBufferAddress(buffer); + (JNIEnv *env, jclass thisClass, jobject byteBuffer, jlong bufferSize) { + jbyte* buffer = (jbyte*)env->GetDirectBufferAddress(byteBuffer); + //Get linear regression QR model algorithms::linear_regression::ModelQR *qrModel = new algorithms::linear_regression::ModelQR(); - data_management::OutputDataArchive *dataArchive = new data_management::OutputDataArchive((daal::byte*)bufferPtr, bufferSize); + + //deserialize model + data_management::OutputDataArchive *dataArchive = + new data_management::OutputDataArchive((daal::byte*)buffer, bufferSize); qrModel->deserialize(*dataArchive); - services::SharedPtr *sharedPtr = new services::SharedPtr(qrModel); + services::SharedPtr *sharedPtr = + new services::SharedPtr(qrModel); return (jlong)sharedPtr; } /** JNI wrapper for serializing DAAL QR models to byte arrays */ JNIEXPORT void JNICALL Java_com_intel_daal_algorithms_ModelSerializer_cFreeByteBuffer - (JNIEnv *env, jclass thisClass, jobject buffer) { - daal::byte* byteArray = (daal::byte*)env->GetDirectBufferAddress(buffer); - env->DeleteGlobalRef(buffer); - delete byteArray; + (JNIEnv *env, jclass thisClass, jobject byteBuffer) { + daal::byte* buffer = (daal::byte*)env->GetDirectBufferAddress(byteBuffer); + daal_free(buffer); } diff --git a/daal-utils/src/main/java/com/intel/daal/algorithms/ModelSerializer.java b/daal-utils/src/main/java/com/intel/daal/algorithms/ModelSerializer.java index 7c90b48d75..4288afc97e 100644 --- a/daal-utils/src/main/java/com/intel/daal/algorithms/ModelSerializer.java +++ b/daal-utils/src/main/java/com/intel/daal/algorithms/ModelSerializer.java @@ -24,6 +24,14 @@ * Serializer/Deserializer for DAAL models */ public final class ModelSerializer { + + /** + * Private constructor for utility class + */ + private ModelSerializer() { + //Not called + } + static { System.loadLibrary("AtkDaalJavaAPI"); } @@ -50,7 +58,9 @@ public static byte[] serializeQrModel(com.intel.daal.algorithms.linear_regressio * @param serializedCObject Serialized model * @return Deserialized model */ - public static com.intel.daal.algorithms.linear_regression.Model deserializeQrModel(DaalContext context, byte[] serializedCObject) { + public static com.intel.daal.algorithms.linear_regression.Model deserializeQrModel( + DaalContext context, byte[] serializedCObject) { + ByteBuffer buffer = ByteBuffer.allocateDirect(serializedCObject.length); buffer.put(serializedCObject); @@ -59,9 +69,27 @@ public static com.intel.daal.algorithms.linear_regression.Model deserializeQrMod return qrModel; } + /** + * Native method for serializing DAAL linear regression QR model + * + * @param cModel Pointer to model + * @return Serialized model + */ protected static native ByteBuffer cSerializeQrModel(long cModel); + /** + * Native method for deserializing DAAL linear regression QR model + * + * @param buffer Buffer with serialized model + * @param size Buffer size + * @return Deserialized model + */ protected static native long cDeserializeQrModel(ByteBuffer buffer, long size); - private static native void cFreeByteBuffer(ByteBuffer var1); + /** + * Native method for freeing byte buffer + * + * @param byteBuffer Byte buffer + */ + private static native void cFreeByteBuffer(ByteBuffer byteBuffer); } diff --git a/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/regression/linear/DaalLinearPredictAlgorithm.scala b/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/regression/linear/DaalLinearPredictAlgorithm.scala index be9751d445..2dc26dab0e 100644 --- a/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/regression/linear/DaalLinearPredictAlgorithm.scala +++ b/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/regression/linear/DaalLinearPredictAlgorithm.scala @@ -80,8 +80,7 @@ case class DaalLinearPredictAlgorithm(modelData: DaalLinearRegressionModelData, testData: IndexedNumericTable): IndexedNumericTable = { val predictAlgorithm = new PredictionBatch(context, classOf[java.lang.Double], PredictionMethod.defaultDense) val testTable = testData.getUnpackedTable(context) - - require(testTable.getNumberOfColumns > 0 && testTable.getNumberOfRows > 0) + println(s"Predicting table ${testData.index} with numRows ${testTable.getNumberOfRows} and numCols ${testTable.getNumberOfColumns}") predictAlgorithm.input.set(PredictionInputId.data, testTable) predictAlgorithm.input.set(PredictionInputId.model, trainedModel) diff --git a/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedLabeledTable.scala b/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedLabeledTable.scala index d94ba9041c..3f14230e8c 100644 --- a/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedLabeledTable.scala +++ b/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedLabeledTable.scala @@ -55,6 +55,7 @@ object DistributedLabeledTable { val numCols = first.size val numFeatureCols = splitIndex val numLabelCols = numCols - splitIndex + val rowSumAccum = vectorRdd.sparkContext.accumulator(0L, "Row Sum Accumulator") val tableRdd = vectorRdd.mapPartitionsWithIndex { case (i, iter) => @@ -70,6 +71,8 @@ object DistributedLabeledTable { numRows += 1 } + rowSumAccum += numRows + val featureTable = new IndexedNumericTable(i, new HomogenNumericTable(context, featureBuf.toArray, numFeatureCols, numRows)) val labelTable = new IndexedNumericTable(i, new HomogenNumericTable(context, @@ -80,7 +83,7 @@ object DistributedLabeledTable { Array(indexedTable).toIterator }.filter(_.features.numRows > 0) - val totalRows = tableRdd.map(table => table.features.numRows.toLong).sum().toLong + val totalRows = rowSumAccum.value DistributedLabeledTable(tableRdd, totalRows) } diff --git a/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedNumericTable.scala b/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedNumericTable.scala index 924ec05734..2df6766475 100644 --- a/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedNumericTable.scala +++ b/engine-plugins/daal-plugins/src/main/scala/org/trustedanalytics/atk/engine/daal/plugins/tables/DistributedNumericTable.scala @@ -75,7 +75,7 @@ object DistributedNumericTable { * @return distributed numeric table */ def createTable(vectorRdd: RDD[Vector]): DistributedNumericTable = { - + val rowSumAccum = vectorRdd.sparkContext.accumulator(0L, "Row Sum Accumulator") val tableRdd = vectorRdd.mapPartitionsWithIndex { case (i, iter) => val indexedTable: IndexedNumericTable = withDaalContext { context => @@ -90,6 +90,7 @@ object DistributedNumericTable { numRows += 1 } + rowSumAccum += numRows val table = new HomogenNumericTable(context, buf.toArray, numElements / numRows, numRows) new IndexedNumericTable(i, table) }.elseError("Could not convert numeric table to vector RDD") @@ -97,7 +98,7 @@ object DistributedNumericTable { Array(indexedTable).toIterator }.filter(_.numRows > 0) - val totalRows = tableRdd.map(table => table.numRows).sum().toLong + val totalRows = rowSumAccum.value DistributedNumericTable(tableRdd, totalRows) }