diff --git a/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/atk/plugins/MLJsonProtocol.scala b/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/atk/plugins/MLJsonProtocol.scala index e8b7bab408..97c311306c 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/atk/plugins/MLJsonProtocol.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/atk/plugins/MLJsonProtocol.scala @@ -16,7 +16,7 @@ package org.apache.spark.ml.atk.plugins -import org.apache.spark.ml.regression.CoxModel +import org.apache.spark.ml.regression.CoxPhModel import org.apache.spark.mllib.atk.plugins.MLLibJsonProtocol import org.apache.spark.mllib.atk.plugins.MLLibJsonProtocol.VectorFormat import org.trustedanalytics.atk.domain.DomainJsonProtocol._ @@ -52,9 +52,9 @@ object MLJsonProtocol { } } - implicit object CoxModelFormat extends JsonFormat[CoxModel] { + implicit object CoxModelFormat extends JsonFormat[CoxPhModel] { - override def write(obj: CoxModel): JsValue = { + override def write(obj: CoxPhModel): JsValue = { val beta = VectorFormat.write(obj.beta) val mean = VectorFormat.write(obj.meanVector) JsObject( @@ -64,7 +64,7 @@ object MLJsonProtocol { ) } - override def read(json: JsValue): org.apache.spark.ml.regression.CoxModel = { + override def read(json: JsValue): org.apache.spark.ml.regression.CoxPhModel = { val fields = json.asJsObject.fields val uid = getOrInvalid(fields, "uid").convertTo[String] val beta = fields.get("beta").map(v => { @@ -75,7 +75,7 @@ object MLJsonProtocol { VectorFormat.read(v) }).get - new CoxModel(uid, beta, mean) + new CoxPhModel(uid, beta, mean) } } diff --git a/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/regression/Cox.scala b/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/regression/CoxPh.scala similarity index 53% rename from engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/regression/Cox.scala rename to engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/regression/CoxPh.scala index ab8c10cfcb..8ff8060e63 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/regression/Cox.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/apache/spark/ml/regression/CoxPh.scala @@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import scala.collection.{ Map, mutable } -private[regression] trait CoxParams extends Params +private[regression] trait CoxPhParams extends Params with HasFeaturesCol with HasLabelCol with HasPredictionCol with HasMaxIter with HasTol with HasFitIntercept with Logging { @@ -73,11 +73,11 @@ private[regression] trait CoxParams extends Params } } -class Cox(override val uid: String) - extends Estimator[CoxModel] with CoxParams +class CoxPh(override val uid: String) + extends Estimator[CoxPhModel] with CoxPhParams with DefaultParamsWritable with Logging { - def this() = this(Identifiable.randomUID("coxSurvivalModel")) + def this() = this(Identifiable.randomUID("coxPhSurvivalModel")) /** @group setParam */ def setFeaturesCol(value: String): this.type = set(featuresCol, value) @@ -111,16 +111,16 @@ class Cox(override val uid: String) setDefault(tol -> 1E-6) - override def fit(dataFrame: DataFrame): CoxModel = { + override def fit(dataFrame: DataFrame): CoxPhModel = { val numFeatures = dataFrame.select($(featuresCol)).take(1)(0).getAs[Vector](0).size val meanVector = computeFeatureMean(dataFrame) import breeze.linalg._ - val coxPointRdd = extractSortedCoxPointRdd(dataFrame) + val coxPhPointRdd = extractSortedCoxPhPointRdd(dataFrame) val handlePersistence = dataFrame.rdd.getStorageLevel == StorageLevel.NONE - if (handlePersistence) coxPointRdd.persist(StorageLevel.MEMORY_AND_DISK) - val costFun = new CoxCostFun(coxPointRdd) + if (handlePersistence) coxPhPointRdd.persist(StorageLevel.MEMORY_AND_DISK) + val costFun = new CoxPhCostFun(coxPhPointRdd) var previousBeta = DenseVector.zeros[Double](numFeatures) var previousLoss = 1E-3 @@ -129,14 +129,23 @@ class Cox(override val uid: String) while (iterations < $(maxIter) && (epsilon > $(tol))) { val (currentLoss, currentGradient, currentInformationMatrix) = costFun.calculate(previousBeta) - previousBeta = if (currentInformationMatrix == 0) previousBeta else previousBeta - (currentGradient / currentInformationMatrix) + + try { + val realMatrix = breeze.linalg.pinv(currentInformationMatrix) + val gradientTimesMatrixInverse: DenseMatrix[Double] = currentGradient.toDenseMatrix * realMatrix + val updatedBetaAsMatrix: DenseMatrix[Double] = previousBeta.toDenseMatrix - gradientTimesMatrixInverse + previousBeta = updatedBetaAsMatrix.toDenseVector + } + catch { + case e: MatrixSingularException => throw new MatrixSingularException("Singular Matrix formed, cannot be inverted at iteration:" + iterations) + } epsilon = math.abs(currentLoss - previousLoss) previousLoss = currentLoss iterations += 1 } val coefficients = Vectors.dense(previousBeta.toArray) - val model = new CoxModel(uid, coefficients, meanVector) + val model = new CoxPhModel(uid, coefficients, meanVector) copyValues(model.setParent(this)) } @@ -144,17 +153,17 @@ class Cox(override val uid: String) validateAndTransformSchema(schema, fitting = true) } - override def copy(extra: ParamMap): Cox = defaultCopy(extra) + override def copy(extra: ParamMap): CoxPh = defaultCopy(extra) /** * Extract [[featuresCol]], [[labelCol]] and [[censorCol]] from input dataFrame, * and put it in an RDD of CoxPoint sorted in descending order of time. */ - protected[ml] def extractSortedCoxPointRdd(dataFrame: DataFrame): RDD[CoxPoint] = { + protected[ml] def extractSortedCoxPhPointRdd(dataFrame: DataFrame): RDD[CoxPhPoint] = { val rdd = dataFrame.select($(featuresCol), $(labelCol), $(censorCol)).map { case Row(features: Vector, time: Double, censor: Double) => - CoxPoint(features, time, censor) + CoxPhPoint(features, time, censor) } rdd.sortBy(_.time, false) } @@ -177,7 +186,7 @@ class Cox(override val uid: String) val combOp = (c1: MultivariateOnlineSummarizer, c2: MultivariateOnlineSummarizer) => { - (c1.merge(c2)) + c1.merge(c2) c1 } @@ -188,18 +197,18 @@ class Cox(override val uid: String) } } -object Cox extends DefaultParamsReadable[Cox] { +object CoxPh extends DefaultParamsReadable[CoxPh] { - override def load(path: String): Cox = super.load(path) + override def load(path: String): CoxPh = super.load(path) } /** - * Model produced by [[Cox]]. + * Model produced by [[CoxPh]]. */ -class CoxModel(override val uid: String, - val beta: Vector, - val meanVector: Vector) - extends Model[CoxModel] with CoxParams with MLWritable { +class CoxPhModel(override val uid: String, + val beta: Vector, + val meanVector: Vector) + extends Model[CoxPhModel] with CoxPhParams with MLWritable { /** @group setParam */ def setFeaturesCol(value: String): this.type = set(featuresCol, value) @@ -224,24 +233,23 @@ class CoxModel(override val uid: String, validateAndTransformSchema(schema, fitting = false) } - override def copy(extra: ParamMap): CoxModel = { - copyValues(new CoxModel(uid, beta, meanVector), extra) + override def copy(extra: ParamMap): CoxPhModel = { + copyValues(new CoxPhModel(uid, beta, meanVector), extra) .setParent(parent) } override def write: MLWriter = - new CoxModel.CoxModelWriter(this) + new CoxPhModel.CoxPhModelWriter(this) } -object CoxModel extends MLReadable[CoxModel] { +object CoxPhModel extends MLReadable[CoxPhModel] { - override def read: MLReader[CoxModel] = new CoxModelReader + override def read: MLReader[CoxPhModel] = new CoxPhModelReader - override def load(path: String): CoxModel = super.load(path) - - /** [[MLWriter]] instance for [[CoxModel]] */ - private[CoxModel] class CoxModelWriter(instance: CoxModel) extends MLWriter with Logging { + override def load(path: String): CoxPhModel = super.load(path) + /** [[MLWriter]] instance for [[CoxPhModel]] */ + private[CoxPhModel] class CoxPhModelWriter(instance: CoxPhModel) extends MLWriter with Logging { private case class Data(coefficients: Vector) override protected def saveImpl(path: String): Unit = { @@ -252,12 +260,12 @@ object CoxModel extends MLReadable[CoxModel] { } } - private class CoxModelReader extends MLReader[CoxModel] { + private class CoxPhModelReader extends MLReader[CoxPhModel] { /** Checked against metadata when loading model */ - private val className = classOf[CoxModel].getName + private val className = classOf[CoxPhModel].getName - override def load(path: String): CoxModel = { + override def load(path: String): CoxPhModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString @@ -265,7 +273,7 @@ object CoxModel extends MLReadable[CoxModel] { .select("coefficients").head() val coefficients = data.getAs[Vector](0) val mean = data.getAs[Vector](1) - val model = new CoxModel(metadata.uid, coefficients, mean) + val model = new CoxPhModel(metadata.uid, coefficients, mean) DefaultParamsReader.getAndSetParams(model, metadata) model @@ -274,75 +282,134 @@ object CoxModel extends MLReadable[CoxModel] { } -private class CoxAggregator(parameters: BDV[Double]) +/** + * CoxPhAggregator computes the loss, gradient and informationMatrix for a CoxPh loss function as used in CoxPh survival + * analysis for samples in a dense vector in an online fashion. + * + * Two CoxPhAggregator can be merged together to have a summary of loss, gradient and information matrix of the + * corresponding joint dataset. + * + * Given the values of the covariates x^{'}, for random lifetime t_{i} of subjects i = 1, ..., n, with corresponding + * censoring censor_i, the log likelihood loss function under the CoxPh model is given as: + * { + * L(\beta)=\sum_{i=1}^n[(\beta.x - log{\sum_{j=1}^Re^{\beta.x})})censor_i] + * } + * where R defines the risk-set R(t) is the set of all individuals i with t_i> t, i.e. the people who haven't died or been censored yet. + * + * The gradient vector is computed by taking the partial first order derivative of the above function with respect to beta_1, ..., beta_n + * + * The gradient vector of size 'k' is thus computed as: + * { + * G(\beta_k)=\sum_{i=1}^n[(x_i_k - (\frac{\sum_{j=1}^Re^{\beta_k.x_j_k}x_j_k} + * {{\sum_{j=1}^Re^{\beta.x})}}))censor_i] + * } + * The information matrix of dimensions k*k is given as : + * + * I(a,b) = -\sum_{i=1}^n[\frac{(({\sum_{j=1}^Re^{\beta.x}})({\sum_{j=1}^Rx_j_ax_j_be^{\beta.x}}) - + * ({\sum_{j=1}^Rx_j_ae^{\beta.x}})({\sum_{j=1}^Rx_j_be^{\beta.x}}))censor_i}{({\sum_{j=1}^Re^{\beta.x}})^2} ] + * + * @param parameters + */ +private class CoxPhAggregator(parameters: BDV[Double]) extends Serializable { - //TODO: Need to update these calculations for multivariate implementation - private val beta = parameters private var totalCnt: Long = 0L private var lossSum = 0.0 - private var secondOrderDerivative = 0.0 private var gradientBetaSum = BDV.zeros[Double](beta.length) + private var matrixSum = breeze.linalg.DenseMatrix.zeros[Double](beta.length, beta.length) def count: Long = totalCnt def loss: Double = lossSum def gradient: BDV[Double] = gradientBetaSum - def informationMatrix: Double = secondOrderDerivative + def informationMatrix: breeze.linalg.DenseMatrix[Double] = matrixSum + /** - * Add a new training data to this CoxAggregator, and update the loss and gradient + * Add a new training data to this CoxPhAggregator, and update the loss and gradient * of the objective function. - * @param data The CoxPoint representation for one data point to be added into this aggregator. - * @return This CoxAggregator object. + * @param data The CoxPhPoint representation for one data point to be added into this aggregator. + * @return This CoxPhAggregator object. */ - def add(data: CoxPointWithMetaData): this.type = { + def add(data: CoxPhPointWithMetaData): this.type = { val epsilon = math.log(data.sumEBetaX) val betaX: Double = beta.dot(data.features.toBreeze) - lossSum += (betaX - epsilon) * data.censor - - val rhs: BDV[Double] = if (data.sumEBetaX == 0) BDV(0d) else data.sumXDotEBetaX :/ data.sumEBetaX - val diff = data.features.toBreeze - rhs - gradientBetaSum += diff :* data.censor - - val numeratorA: Double = data.sumXDotEBetaX.dot(data.sumXDotEBetaX) - val numeratorB: Double = data.sumEBetaX * data.sumXSquaredEBetaX - val numerator = numeratorA - numeratorB - val value = if (data.sumEBetaX == 0) numerator * data.censor else (numerator / (data.sumEBetaX * data.sumEBetaX)) * data.censor - secondOrderDerivative += value + if (data.censor != 0.0) { + lossSum += (betaX - epsilon) + gradientBetaSum += computeGradientVector(data) + matrixSum += computeInformationMatrix(data) + } totalCnt += 1 this } /** - * Merge another CoxAggregator, and update the loss and gradient + * Compute the gradient for the given observation + * @param data CoxPhPointWithMetaData storing the observation with it's risk set values + * @return Breeze DenseVector storing the gradient values + */ + def computeGradientVector(data: CoxPhPointWithMetaData): BDV[Double] = { + val gradientVector = BDV.zeros[Double](beta.length) + + for (i <- 0 to beta.length - 1) { + if (data.sumEBetaX != 0.0) + gradientVector(i) = data.features(i) - data.sumXDotEBetaX(i) / data.sumEBetaX + else + gradientVector(i) = 0.0 + } + gradientVector + } + + /** + * Compute the information matrix for the given observation + * @param data CoxPhPointWithMetaData storing the observation with it's risk set values + * @return BreezeDenseMatrix storing the Information Matrix values + */ + def computeInformationMatrix(data: CoxPhPointWithMetaData): breeze.linalg.DenseMatrix[Double] = { + val infoMatrix = breeze.linalg.DenseMatrix.zeros[Double](beta.length, beta.length) + + for (i <- 0 to beta.length - 1) { + for (j <- 0 to beta.length - 1) { + if (data.sumEBetaX != 0) { + val numerator1 = -(data.sumEBetaX * data.sumXiXjEBetaX(i, j)) + val numerator2 = data.sumXDotEBetaX(i) * data.sumXDotEBetaX(j) + val denominator = data.sumEBetaX * data.sumEBetaX + infoMatrix(i, j) = (numerator1 + numerator2) / denominator + } + else + infoMatrix(i, j) = 0.0 + } + } + infoMatrix + } + + /** + * Merge another CoxAggregator, and update the loss, gradient and information matrix * of the objective function. * (Note that it's in place merging; as a result, `this` object will be modified.) * - * @param other The other CoxAggregator to be merged. - * @return This Coxggregator object. + * @param other The other CoxPhAggregator to be merged. + * @return This CoxPhAggregator object. */ - def merge(other: CoxAggregator): this.type = { + def merge(other: CoxPhAggregator): this.type = { totalCnt += other.totalCnt lossSum += other.lossSum - gradientBetaSum += other.gradientBetaSum - secondOrderDerivative += other.secondOrderDerivative + matrixSum += other.matrixSum this } } /** - * CoxCostFun implements our distributed version of Newton Raphson for Cox cost. + * CoxPhCostFun implements our distributed version of Newton Raphson for CoxPh cost. * It returns the loss, gradient and information matrix at a particular point (parameters). * It's used in Breeze's convex optimization routines. */ -private class CoxCostFun(coxPointRdd: RDD[CoxPoint]) { - - def calculate(currentBeta: BDV[Double]): (Double, BDV[Double], Double) = { +private class CoxPhCostFun(coxPhPointRdd: RDD[CoxPhPoint]) { - val coxPointWithCumSumAndBetaX = extractCoxPointsWithMetaData(coxPointRdd, currentBeta) + def calculate(currentBeta: BDV[Double]): (Double, BDV[Double], breeze.linalg.DenseMatrix[Double]) = { - val coxAggregator = coxPointWithCumSumAndBetaX.treeAggregate(new CoxAggregator(currentBeta))( + val coxPhPointWithCumSumAndBetaX = extractCoxPhPointsWithMetaData(coxPhPointRdd, currentBeta) + val coxPhAggregator = coxPhPointWithCumSumAndBetaX.treeAggregate(new CoxPhAggregator(currentBeta))( seqOp = (c, v) => (c, v) match { case (aggregator, instance) => aggregator.add(instance) }, @@ -350,60 +417,57 @@ private class CoxCostFun(coxPointRdd: RDD[CoxPoint]) { case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) }) - (coxAggregator.loss, coxAggregator.gradient, coxAggregator.informationMatrix) + (coxPhAggregator.loss, coxPhAggregator.gradient, coxPhAggregator.informationMatrix) } /** - * Computes additional parameters given CoxPoint and intial beta to be used by Newton Raphson to estimate new beta - * @param coxPointRdd Rdd storing the CoxPoint containing features, time and censor + * Computes additional parameters given CoxPhPoint and intial beta to be used by Newton Raphson to estimate new beta + * @param coxPhPointRdd Rdd storing the CoxPhPoint containing features, time and censor * @param currentBeta The current value for beta - * @return Rdd storing CoxPoint and sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXSquaredEBetaX in addition + * @return Rdd storing CoxPhPoint and sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXiXjEBetaX in addition */ - protected[ml] def extractCoxPointsWithMetaData(coxPointRdd: RDD[CoxPoint], currentBeta: BDV[Double]): RDD[CoxPointWithMetaData] = { + protected[ml] def extractCoxPhPointsWithMetaData(coxPhPointRdd: RDD[CoxPhPoint], currentBeta: BDV[Double]): RDD[CoxPhPointWithMetaData] = { + + val sc = coxPhPointRdd.sparkContext + val riskSetRdd = riskSet(coxPhPointRdd, currentBeta) - val sc = coxPointRdd.sparkContext - val riskSetRdd = riskSet(coxPointRdd, currentBeta) val rRdd = riskSetRdd.map(x => (x._1, x._4, x._5)) val cumulativeSum = computePartitionSum(rRdd, currentBeta.length) val broadCastCumulativeSum = sc.broadcast(cumulativeSum) val finalRisk = computeFinalR(riskSetRdd, broadCastCumulativeSum) - val updatedCoxPoint = coxPointRdd.zip(finalRisk).map { case (a, (sumR, xR, r, sumS, sumT)) => CoxPointWithMetaData(a.features, a.time, a.censor, sumR, xR, r, sumS, sumT) } + val updatedCoxPhPoint = coxPhPointRdd.zip(finalRisk).map { case (a, (sumR, xR, r, sumS, sumT)) => CoxPhPointWithMetaData(a.features, a.time, a.censor, sumR, xR, r, sumS, sumT) } - updatedCoxPoint + updatedCoxPhPoint } - import breeze.linalg.DenseVector - /** - * Returns the sum of each partition for the sumEBetaX, sumXEBetaX, sumXSquaredEBetaX values - * @param rdd Rdd containing for each observation the, sumEBetaX, sumXEBetaX, sumXSquaredEBetaX + * Returns the sum of each partition for the sumEBetaX, sumXEBetaX, sumXiXjEBetaX values + * @param rdd Rdd containing for each observation the, sumEBetaX, sumXEBetaX, sumXiXjEBetaX * @param length The number of co-variates - * @return Map storing, for each partition the sumEBetaX, sumXEBetaX, sumXSquaredEBetaX values + * @return Map storing, for each partition the sumEBetaX, sumXEBetaX, sumXiXjEBetaX values */ - def computePartitionSum(rdd: RDD[(Double, BDV[Double], Double)], length: Int): scala.collection.Map[Int, (Double, BDV[Double], Double)] = { - //TODO: Consider replacing mapPartitionsWithIndex with accumulator in riskSet + def computePartitionSum(rdd: RDD[(Double, BDV[Double], breeze.linalg.DenseMatrix[Double])], length: Int): scala.collection.Map[Int, (Double, BDV[Double], breeze.linalg.DenseMatrix[Double])] = { + import breeze.linalg.DenseVector val array = rdd.mapPartitionsWithIndex { case (index, iterator) => { var sumEBetaX = 0.0 - var sumXSquaredEBetaX = 0.0 var sumXEBetaX = DenseVector.zeros[Double](length) - + var sumXiXjEBetaX = breeze.linalg.DenseMatrix.zeros[Double](length, length) while (iterator.hasNext) { - val (partialSumEBetaX, partialSumXEBetaX, partialSumXSquaredEBetaX) = iterator.next() + val (partialSumEBetaX, partialSumXEBetaX, partialSumXiXjEBetaX) = iterator.next() sumEBetaX = partialSumEBetaX sumXEBetaX = partialSumXEBetaX - sumXSquaredEBetaX = partialSumXSquaredEBetaX - + sumXiXjEBetaX = partialSumXiXjEBetaX } - val sumTuple = (index + 1, (sumEBetaX, sumXEBetaX, sumXSquaredEBetaX)) + val sumTuple = (index + 1, (sumEBetaX, sumXEBetaX, sumXiXjEBetaX)) Array(sumTuple).toIterator } }.collect() - val initTuple = (0, (0d, BDV.zeros[Double](length), 0d)) + val initTuple = (0, (0d, BDV.zeros[Double](length), breeze.linalg.DenseMatrix.zeros[Double](length, length))) val cumSum = array.scanLeft(initTuple)((x, y) => { val (xIndex, (xSumR, xSumS, xSumT)) = x val (yIndex, (ySumR, ySumS, ySumT)) = y @@ -414,28 +478,36 @@ private class CoxCostFun(coxPointRdd: RDD[CoxPoint]) { } /** - * Computes meta data using CoxPoint and current beta with one pass over the data + * Computes meta data using CoxPhPoint and current beta with one pass over the data * @param sortedData Rdd storing the features, time and censor information sorted in decreasing order on time * @param currentBeta The current beta value - * @return Rdd containing the meta data as a tuple with sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXSquaredEBetaX + * @return Rdd containing the meta data as a tuple with sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXiXjEBetaX */ - def riskSet(sortedData: RDD[CoxPoint], currentBeta: BDV[Double]): RDD[(Double, BDV[Double], Double, BDV[Double], Double)] = { + def riskSet(sortedData: RDD[CoxPhPoint], currentBeta: BDV[Double]): RDD[(Double, BDV[Double], Double, BDV[Double], breeze.linalg.DenseMatrix[Double])] = { import breeze.linalg.DenseVector val metaData = sortedData.mapPartitionsWithIndex { case (i, iter) => var sumEBetaX: Double = 0.0 - var sumXSquaredEBetaX: Double = 0.0 - var sumXEBetaX = DenseVector.zeros[Double](currentBeta.length) - val featureBuf = new ArrayBuffer[(Double, BDV[Double], Double, BDV[Double], Double)]() + var sumXiEBetaX = DenseVector.zeros[Double](currentBeta.length) + var sumXiXjEBetaX = breeze.linalg.DenseMatrix.zeros[Double](currentBeta.length, currentBeta.length) + + val featureBuf = new ArrayBuffer[(Double, BDV[Double], Double, BDV[Double], breeze.linalg.DenseMatrix[Double])]() while (iter.hasNext) { - val xj: BDV[Double] = new BDV(iter.next().features.toArray) - val eBetaX = math.exp(currentBeta.dot(xj)) - val xSquared: Double = xj.dot(xj) - sumXSquaredEBetaX += xSquared * eBetaX - sumEBetaX += eBetaX - val xEBetaX: BDV[Double] = xj * eBetaX - sumXEBetaX = xEBetaX + sumXEBetaX - val sumTuple = (sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXSquaredEBetaX) + val x: BDV[Double] = new BDV(iter.next().features.toArray) + val eBetaX = math.exp(currentBeta.dot(x)) + + val xiXjEBetaX = breeze.linalg.DenseMatrix.zeros[Double](currentBeta.length, currentBeta.length) + val xiEBetaX = x * eBetaX + for (i <- 0 to currentBeta.length - 1) { + for (j <- 0 to currentBeta.length - 1) + xiXjEBetaX(i, j) = x(i) * x(j) * eBetaX + } + + sumXiEBetaX = sumXiEBetaX + xiEBetaX + sumEBetaX = sumEBetaX + eBetaX + sumXiXjEBetaX = sumXiXjEBetaX + xiXjEBetaX + + val sumTuple = (sumEBetaX, xiEBetaX, eBetaX, sumXiEBetaX, sumXiXjEBetaX) featureBuf += sumTuple } featureBuf.iterator @@ -444,24 +516,25 @@ private class CoxCostFun(coxPointRdd: RDD[CoxPoint]) { } /** - * Computes the sum of sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXSquaredEBetaX across all partitions - * @param riskSetRdd Rdd containing the meta data as a tuple with sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXSquaredEBetaX + * Computes the sum of sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXiXjEBetaX across all partitions + * @param riskSetRdd Rdd containing the meta data as a tuple with sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXiXjEBetaX * @param broadcast Broadcast variable containing the sums of each partitions - * @return Rdd of the sum of sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXSquaredEBetaX across all partitions + * @return Rdd of the sum of sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXiXjEBetaX across all partitions */ - def computeFinalR(riskSetRdd: RDD[(Double, BDV[Double], Double, BDV[Double], Double)], broadcast: Broadcast[Map[Int, (Double, BDV[Double], Double)]]): RDD[(Double, BDV[Double], Double, BDV[Double], Double)] = { + def computeFinalR(riskSetRdd: RDD[(Double, BDV[Double], Double, BDV[Double], breeze.linalg.DenseMatrix[Double])], + broadcast: Broadcast[Map[Int, (Double, BDV[Double], breeze.linalg.DenseMatrix[Double])]]): RDD[(Double, BDV[Double], Double, BDV[Double], breeze.linalg.DenseMatrix[Double])] = { riskSetRdd.mapPartitionsWithIndex { case (i, iter) => val prevSumEBetaX = broadcast.value.getOrElse(i, throw new IllegalArgumentException("Previous sum e^beta.x not computed."))._1 val prevSumXEBetaX = broadcast.value.getOrElse(i, throw new IllegalArgumentException("Previous sum x.e^beta.x not computed."))._2 - val prevSumXSquaredEBetaX = broadcast.value.getOrElse(i, throw new IllegalArgumentException("Previous sum x^2.e^beta.x not computed."))._3 - val featureBuf = new ArrayBuffer[(Double, BDV[Double], Double, BDV[Double], Double)]() + val prevSumXiXjEBetaX = broadcast.value.getOrElse(i, throw new IllegalArgumentException("Previous sum xi.xj.e^beta.x not computed"))._3 + val featureBuf = new ArrayBuffer[(Double, BDV[Double], Double, BDV[Double], breeze.linalg.DenseMatrix[Double])]() while (iter.hasNext) { - val (sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXSquaredEBetaX) = iter.next() + val (sumEBetaX, xEBetaX, eBetaX, sumXEBetaX, sumXiXjEBetaX) = iter.next() val updatedSumEBetaX = sumEBetaX + prevSumEBetaX val updatedSumXEBetaX = sumXEBetaX + prevSumXEBetaX - val updatedSumXSquaredEBetaX = sumXSquaredEBetaX + prevSumXSquaredEBetaX - val sumTuple = (updatedSumEBetaX, xEBetaX, eBetaX, updatedSumXEBetaX, updatedSumXSquaredEBetaX) + val updatedSumXiXjEBetaX = sumXiXjEBetaX + prevSumXiXjEBetaX + val sumTuple = (updatedSumEBetaX, xEBetaX, eBetaX, updatedSumXEBetaX, updatedSumXiXjEBetaX) featureBuf += sumTuple } @@ -478,8 +551,8 @@ private class CoxCostFun(coxPointRdd: RDD[CoxPoint]) { * @param censor Indicator of the event has occurred or not. If the value is 1, it means * the event has occurred i.e. uncensored; otherwise censored. */ -private[regression] case class CoxPoint(features: Vector, time: Double, censor: Double) { - require(censor == 1.0 || censor == 0.0, "censor of class CoxPoint must be 1.0 or 0.0") +private[regression] case class CoxPhPoint(features: Vector, time: Double, censor: Double) { + require(censor == 1.0 || censor == 0.0, "censor of class CoxPhPoint must be 1.0 or 0.0") } /** @@ -490,14 +563,14 @@ private[regression] case class CoxPoint(features: Vector, time: Double, censor: * @param sumEBetaX Sum of e raised to the dot product of beta and features, for all observations in the risk set of an observation * @param xDotEBetaX Dot product of feature and e raised to the dot product of beta and features * @param eBetaX e raised to dot product of beta and features - * @param sumXDotEBetaX Sum of Dot product of feature and e raised to the dot product of beta and features, for all observations in the risk set of an observatiiiion - * @param sumXSquaredEBetaX Sum of Dot product of square of the feature and e raised to the dot product of beta and features, for all observations in the risk set of an observatiiiion + * @param sumXDotEBetaX Sum of Dot product of feature and e raised to the dot product of beta and features, for all observations in the risk set of an observation */ -case class CoxPointWithMetaData(features: Vector, - time: Double, - censor: Double, - sumEBetaX: Double, - xDotEBetaX: BDV[Double], - eBetaX: Double, - sumXDotEBetaX: BDV[Double], - sumXSquaredEBetaX: Double) +case class CoxPhPointWithMetaData(features: Vector, + time: Double, + censor: Double, + sumEBetaX: Double, + xDotEBetaX: BDV[Double], + eBetaX: Double, + sumXDotEBetaX: BDV[Double], + sumXiXjEBetaX: breeze.linalg.DenseMatrix[Double]) + diff --git a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/PowerIterationClusteringPlugin.scala b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/PowerIterationClusteringPlugin.scala index 34b3a7ec50..c4236b8530 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/PowerIterationClusteringPlugin.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/PowerIterationClusteringPlugin.scala @@ -110,6 +110,7 @@ class PowerIterationClusteringPlugin extends SparkCommandPlugin[PowerIterationCl val frameRdd = FrameRdd.toFrameRdd(schema, rdd) val frameReference = engine.frames.tryNewFrame(CreateEntityArgs(description = Some("created by PIC operation"))) { newPredictedFrame: FrameEntity => newPredictedFrame.save(frameRdd) + } val k = output.k val clusterSize = rdd.map(row => ("Cluster:" + row(1).toString, 1)).reduceByKey(_ + _).collect().toMap diff --git a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/collaborativefiltering/CollaborativeFilteringPredictPlugin.scala b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/collaborativefiltering/CollaborativeFilteringPredictPlugin.scala index 84d0b3ddb5..c9b81530bf 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/collaborativefiltering/CollaborativeFilteringPredictPlugin.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/collaborativefiltering/CollaborativeFilteringPredictPlugin.scala @@ -34,6 +34,7 @@ import org.trustedanalytics.atk.domain.DomainJsonProtocol._ /** * Collaborative filtering predict + * */ @PluginDoc(oneLine = "Collaborative Filtering Predict (ALS).", extended = """See :ref:`Collaborative Filtering Train diff --git a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhData.scala b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhData.scala index 1846835b5d..d33891431f 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhData.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhData.scala @@ -16,9 +16,9 @@ package org.trustedanalytics.atk.engine.model.plugins.survivalanalysis -import org.apache.spark.ml.regression.CoxModel +import org.apache.spark.ml.regression.CoxPhModel -case class CoxPhData(coxModel: CoxModel, featureColumns: List[String], timeColumn: String, censorColumn: String) { +case class CoxPhData(coxModel: CoxPhModel, featureColumns: List[String], timeColumn: String, censorColumn: String) { require(featureColumns != null && featureColumns.nonEmpty, "featureColumns must not be null nor empty") require(coxModel != null, "coxModel must not be null") } diff --git a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainArgs.scala b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainArgs.scala index 1da0858bdf..5611117dcd 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainArgs.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainArgs.scala @@ -37,7 +37,6 @@ case class CoxPhTrainArgs(model: ModelReference, require(timeColumn != null && timeColumn.nonEmpty, "Time column must not be null or empty") require(censorColumn != null && censorColumn.nonEmpty, "Censor column must not be null or empty") require(covariateColumns != null && covariateColumns.nonEmpty, "Co-variate columns must not be null or empty") - require(covariateColumns.length == 1, "Current implementation is for univariate data. Multivariate is under implementation") require(maxSteps > 0, "Max steps must be a positive integer") } diff --git a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainPlugin.scala b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainPlugin.scala index adc7ac6699..71ac0c0f09 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainPlugin.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/survivalanalysis/CoxPhTrainPlugin.scala @@ -16,7 +16,7 @@ package org.trustedanalytics.atk.engine.model.plugins.survivalanalysis -import org.apache.spark.ml.regression.{ Cox } +import org.apache.spark.ml.regression.CoxPh import org.trustedanalytics.atk.engine.frame.SparkFrame import org.trustedanalytics.atk.engine.model.Model import org.trustedanalytics.atk.engine.model.plugins.ModelPluginImplicits._ @@ -74,8 +74,8 @@ object CoxPhTrainPlugin { * @param arguments Arguments passed for training the LinearRegression model * @return Initialized Cox model with training arguments */ - def initializeCoxModel(arguments: CoxPhTrainArgs): Cox = { - val cox = new Cox() + def initializeCoxModel(arguments: CoxPhTrainArgs): CoxPh = { + val cox = new CoxPh() cox.setLabelCol("time") cox.setFeaturesCol("features") cox.setCensorCol("censor") diff --git a/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/regression/CoxPhTest.scala b/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/regression/CoxPhTest.scala new file mode 100644 index 0000000000..c5c7897e4e --- /dev/null +++ b/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/regression/CoxPhTest.scala @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.regression + +import org.apache.spark.mllib.linalg.DenseVector +import org.scalatest.Matchers +import org.trustedanalytics.atk.testutils.MatcherUtils._ +import org.trustedanalytics.atk.testutils.TestingSparkContextFlatSpec +import breeze.linalg.{ DenseVector => BDV, * } +import breeze.linalg.{ DenseMatrix => BDM, * } + +class CoxPhTest extends TestingSparkContextFlatSpec with Matchers { + + val sortedCoxPointArray = Array(new CoxPhPoint(new DenseVector(Array(18d, 42d)), 6d, 1d), + new CoxPhPoint(new DenseVector(Array(19d, 79d)), 5d, 1d), + new CoxPhPoint(new DenseVector(Array(6d, 46d)), 4d, 1d), + new CoxPhPoint(new DenseVector(Array(4d, 66d)), 3d, 1d), + new CoxPhPoint(new DenseVector(Array(0d, 90d)), 2d, 1d), + new CoxPhPoint(new DenseVector(Array(12d, 20d)), 1d, 1d), + new CoxPhPoint(new DenseVector(Array(0d, 73d)), 0d, 1d)) + + "extractCoxPointsWithMetaData in CoxCostFun with 0 beta" should "compute correct Rdd" in { + val coxRdd = sparkContext.parallelize(sortedCoxPointArray, 3) + + val currentBeta = BDV(0d, 0d) + val coxCostFun = new CoxPhCostFun(coxRdd) + val coxWithMetaData = coxCostFun.extractCoxPhPointsWithMetaData(coxRdd, currentBeta) + val coxWithMetaDataArray = coxWithMetaData.collect() + + val estimatedCoxMetaDataArray = Array( + new CoxPhPointWithMetaData(new DenseVector(Array(18d, 42d)), 6d, 1d, 1d, BDV(18d, 42d), 1d, BDV(18d, 42d), new BDM(2, 2, Array(324.0, 756.0, 756.0, 1764.0))), + new CoxPhPointWithMetaData(new DenseVector(Array(19d, 79d)), 5d, 1d, 2d, BDV(19d, 79d), 1d, BDV(37d, 121d), new BDM(2, 2, Array(685d, 2257d, 2257d, 8005d))), + new CoxPhPointWithMetaData(new DenseVector(Array(6d, 46d)), 4d, 1d, 3d, BDV(6d, 46d), 1d, BDV(43d, 167d), new BDM(2, 2, Array(721d, 2533d, 2533d, 10121d))), + new CoxPhPointWithMetaData(new DenseVector(Array(4d, 66d)), 3d, 1d, 4d, BDV(4d, 66d), 1d, BDV(47d, 233d), new BDM(2, 2, Array(737d, 2797d, 2797d, 14477d))), + new CoxPhPointWithMetaData(new DenseVector(Array(0d, 90d)), 2d, 1d, 5d, BDV(0d, 90d), 1d, BDV(47d, 323d), new BDM(2, 2, Array(737d, 2797d, 2797d, 22577d))), + new CoxPhPointWithMetaData(new DenseVector(Array(12d, 20d)), 1d, 1d, 6d, BDV(12d, 20d), 1d, BDV(59d, 343d), new BDM(2, 2, Array(881d, 3037d, 3037d, 22977d))), + new CoxPhPointWithMetaData(new DenseVector(Array(0d, 73d)), 0d, 1d, 7d, BDV(0d, 73d), 1d, BDV(59d, 416d), new BDM(2, 2, Array(881d, 3037d, 3037d, 28306d)))) + coxWithMetaDataArray shouldBe estimatedCoxMetaDataArray + } + + "computeGradientVector" should "compute correct Gradient vector" in { + val data = new CoxPhPointWithMetaData(new DenseVector(Array(4d, 66d)), 3d, 1d, 4d, BDV(4d, 66d), 1d, BDV(47d, 233d), new BDM(2, 2, Array(737d, 2797d, 2797d, 14477d))) + val estimatedGradientVector = BDV(-7.75, 7.75) + + val coxAgg = new CoxPhAggregator(BDV(0.0, 0.0)) + val computedGradientVector = coxAgg.computeGradientVector(data) + + computedGradientVector.toArray should equalWithTolerance(estimatedGradientVector.toArray) + } + + "computeInformationMatrix" should "compute Information Matrix" in { + val data = new CoxPhPointWithMetaData(new DenseVector(Array(0d, 73d)), 0d, 1d, 7d, BDV(0d, 73d), 1d, BDV(59d, 416d), new BDM(2, 2, Array(881d, 3037d, 3037d, 28306d))) + val estimatedInformationMatrix = BDM(-54.816326, 67.040816, 67.040816, -511.959183) + + val coxAgg = new CoxPhAggregator(BDV(0.0, 0.0)) + val computedInformationMatrix = coxAgg.computeInformationMatrix(data) + + computedInformationMatrix.toArray should equalWithTolerance(estimatedInformationMatrix.toArray) + } + + "calculate in CoxCostFun" should "compute loss and gradient" in { + val coxRdd = sparkContext.parallelize(sortedCoxPointArray, 4) + + val currentBeta = BDV(0d, 0d) + val estimatedLoss = -8.52516136 + val estimatedGradient = BDV(-31.24523, 18.38809) + val estimatedInformationMatrix = BDM(-245.321604, 100.3460941, 100.3460941, -2258.997794) + + val coxCostFun = new CoxPhCostFun(coxRdd) + val (loss, gradient, informationMatrix) = coxCostFun.calculate(currentBeta) + + loss shouldBe estimatedLoss +- 1e-6 + gradient.toArray should equalWithTolerance(estimatedGradient.toArray) + informationMatrix.toArray should equalWithTolerance(estimatedInformationMatrix.toArray) + } + +} \ No newline at end of file diff --git a/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/regression/CoxTest.scala b/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/regression/CoxTest.scala deleted file mode 100644 index 763516612b..0000000000 --- a/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/regression/CoxTest.scala +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Copyright (c) 2015 Intel Corporation  - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *       http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.ml.regression - -import org.apache.spark.mllib.linalg.DenseVector -import org.scalatest.Matchers -import org.trustedanalytics.atk.testutils.MatcherUtils._ -import org.trustedanalytics.atk.testutils.TestingSparkContextFlatSpec -import breeze.linalg.{ DenseVector => BDV, * } - -class CoxTest extends TestingSparkContextFlatSpec with Matchers { - - val sortedCoxPointArray = Array(new CoxPoint(new DenseVector(Array(27.9)), 2421d, 1d), - new CoxPoint(new DenseVector(Array(28.3)), 2201d, 1d), - new CoxPoint(new DenseVector(Array(26.5)), 2065d, 1d), - new CoxPoint(new DenseVector(Array(30.7)), 1205d, 1d), - new CoxPoint(new DenseVector(Array(35.7)), 1002d, 1.0), - new CoxPoint(new DenseVector(Array(22.7)), 374d, 1.0), - new CoxPoint(new DenseVector(Array(27.1)), 189d, 1d), - new CoxPoint(new DenseVector(Array(21.5)), 98d, 1d), - new CoxPoint(new DenseVector(Array(31.4)), 6d, 1d)) - - "extractCoxPointsWithMetaData in CoxCostFun with 0 beta" should "compute correct Rdd" in { - val coxRdd = sparkContext.parallelize(sortedCoxPointArray, 2) - - val currentBeta = BDV(0d) - val coxCostFun = new CoxCostFun(coxRdd) - val coxWithMetaData = coxCostFun.extractCoxPointsWithMetaData(coxRdd, currentBeta) - val coxWithMetaDataArray = coxWithMetaData.collect() - - val estimatedCoxMetaDataArray = Array(new CoxPointWithMetaData(new DenseVector(Array(27.9)), 2421.0, 1.0, 1.0, BDV(27.9), 1.0, BDV(27.9), 778.41), - CoxPointWithMetaData(new DenseVector(Array(28.3)), 2201.0, 1.0, 2.0, BDV(28.3), 1.0, BDV(56.2), 1579.3), - CoxPointWithMetaData(new DenseVector(Array(26.5)), 2065.0, 1.0, 3.0, BDV(26.5), 1.0, BDV(82.7), 2281.55), - CoxPointWithMetaData(new DenseVector(Array(30.7)), 1205.0, 1.0, 4.0, BDV(30.7), 1.0, BDV(113.4), 3224.04), - CoxPointWithMetaData(new DenseVector(Array(35.7)), 1002.0, 1.0, 5.0, BDV(35.7), 1.0, BDV(149.10000000000002), 4498.530000000001), - CoxPointWithMetaData(new DenseVector(Array(22.7)), 374.0, 1.0, 6.0, BDV(22.7), 1.0, BDV(171.8), 5013.82), - CoxPointWithMetaData(new DenseVector(Array(27.1)), 189.0, 1.0, 7.0, BDV(27.1), 1.0, BDV(198.9), 5748.2300000000005), - CoxPointWithMetaData(new DenseVector(Array(21.5)), 98.0, 1.0, 8.0, BDV(21.5), 1.0, BDV(220.4), 6210.4800000000005), - CoxPointWithMetaData(new DenseVector(Array(31.4)), 6.0, 1.0, 9.0, BDV(31.4), 1.0, BDV(251.8), 7196.4400000000005)) - - coxWithMetaDataArray shouldBe estimatedCoxMetaDataArray - } - - "extractCoxPointsWithMetaData in CoxCostFun with -0.0326" should "compute correct Rdd" in { - val coxRdd = sparkContext.parallelize(sortedCoxPointArray, 2) - - val currentBeta = BDV(-0.0326) - val coxCostFun = new CoxCostFun(coxRdd) - val coxWithMetaData = coxCostFun.extractCoxPointsWithMetaData(coxRdd, currentBeta) - val coxWithMetaDataArray = coxWithMetaData.collect() - - val estimatedCoxMetaDataArray = Array( - new CoxPointWithMetaData(new DenseVector(Array(27.9)), 2421d, 1d, 0.4027094277702852, BDV(11.235593034790956), 0.4027094277702852, BDV(11.235593034790956), 313.47304567066766), - new CoxPointWithMetaData(new DenseVector(Array(28.3)), 2201d, 1d, 0.8002016149399473, BDV(11.249028896901438), 0.3974921871696621, BDV(22.484621931692395), 631.8205634529784), - new CoxPointWithMetaData(new DenseVector(Array(26.5)), 2065d, 1d, 1.2217165791047768, BDV(11.170146550367985), 0.4215149641648296, BDV(33.654768482060376), 927.82944703773), - new CoxPointWithMetaData(new DenseVector(Array(30.7)), 1205d, 1d, 1.5892944827817275, BDV(11.284641642882386), 0.3675779036769507, BDV(44.93941012494276), 1274.2679454742192), - new CoxPointWithMetaData(new DenseVector(Array(35.7)), 1002d, 1d, 1.9015854308015716, BDV(11.148786844308441), 0.31229094801984425, BDV(56.0881969692512), 1672.2796358160306), - new CoxPointWithMetaData(new DenseVector(Array(22.7)), 374d, 1d, 2.378689804139718, BDV(10.830269274775917), 0.47710437333814615, BDV(66.91846624402712), 1918.1267483534439), - new CoxPointWithMetaData(new DenseVector(Array(27.1)), 189d, 1d, 2.792040046893404, BDV(11.201791578624901), 0.41335024275368637, BDV(78.12025782265202), 2221.695300134179), - new CoxPointWithMetaData(new DenseVector(Array(21.5)), 98d, 1d, 3.288178624968128, BDV(10.666979428606561), 0.4961385780747238, BDV(88.78723725125857), 2451.0353578492195), - new CoxPointWithMetaData(new DenseVector(Array(31.4)), 6d, 1d, 3.647463385532477, BDV(11.281541481720554), 0.3592847605643489, BDV(100.06877873297913), 2805.275760375245)) - - coxWithMetaDataArray shouldBe estimatedCoxMetaDataArray - } - "calculate in CoxCostFun" should "compute loss and gradient" in { - val coxRdd = sparkContext.parallelize(sortedCoxPointArray, 2) - - val currentBeta = BDV(0d) - val estimatedLoss = -12.801827480081469 - val estimatedGradient = BDV(-2.5120634920635005) - val estimatedInformationMatrix = -77.12552113882519 - - val coxCostFun = new CoxCostFun(coxRdd) - val (loss, gradient, informationMatrix) = coxCostFun.calculate(currentBeta) - - loss shouldBe estimatedLoss +- 1e-6 - gradient.toArray should equalWithTolerance(estimatedGradient.toArray) - informationMatrix shouldBe estimatedInformationMatrix +- 1e-6 - } -} diff --git a/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/util/MlTestingUtils.scala b/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/util/MlTestingUtils.scala new file mode 100644 index 0000000000..d42f2e53fe --- /dev/null +++ b/engine-plugins/model-plugins/src/test/scala/org/apache/spark/ml/util/MlTestingUtils.scala @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.util + +import org.apache.spark.ml.Model +import org.apache.spark.ml.param.ParamMap + +object MLTestingUtils { + def checkCopy(model: Model[_]): Unit = { + val copied = model.copy(ParamMap.empty) + .asInstanceOf[Model[_]] + assert(copied.parent.uid == model.parent.uid) + assert(copied.parent == model.parent) + } +}