From 823d971f24409e78663ecc13e344d6bc671d13d3 Mon Sep 17 00:00:00 2001 From: Briton Barker Date: Tue, 24 May 2016 17:05:36 -0700 Subject: [PATCH 1/3] logging for jobcontext update and stale check --- .../engine/command/mgmt/YarnJobsMonitor.scala | 16 +++++++++++++--- .../interfaces/src/main/resources/reference.conf | 3 ++- .../atk/repository/SlickMetaStoreComponent.scala | 8 ++++++-- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala index 430f37b9c3..96c1c7ec8e 100644 --- a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala +++ b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala @@ -16,6 +16,7 @@ package org.trustedanalytics.atk.engine.command.mgmt import java.util.concurrent.TimeUnit +import java.net._ import org.trustedanalytics.atk.domain.jobcontext.JobContext import org.trustedanalytics.atk.engine.{ EngineConfig, Engine } @@ -28,9 +29,12 @@ import org.trustedanalytics.atk.event.EventLogging class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends Runnable with EventLogging { lazy val timeoutMinutes: Long = EngineConfig.yarnMonitorTaskTimeout + lazy val timeoutMillis: Long = timeoutMinutes * 60 * 1000 def run(): Unit = { - info(s"YarnJobsMonitor started. Task timeout is $timeoutMinutes minutes.") + val localHost = InetAddress.getLocalHost + val nowMillis = System.currentTimeMillis() + info(s"YarnJobsMonitor started on $localHost at $nowMillis total ms. Task timeout is $timeoutMinutes minutes (or $timeoutMillis ms).") while (true) { engine.getCommandsNotComplete().foreach { command => engine.getCommandJobContext(command) match { @@ -44,6 +48,12 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R } } - def hasStaleContext(context: JobContext): Boolean = - System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000 + def hasStaleContext(context: JobContext): Boolean = { + val localHost = InetAddress.getLocalHost + val nowMillis = System.currentTimeMillis() + val lastModMillis = context.modifiedOn.getMillis + info(s"YarnJobsMonitor hasStaleContext check called $localHost at $nowMillis total ms against context.modified.getMillis=$lastModMillis") + //System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000 + nowMillis - lastModMillis > timeoutMillis + } } diff --git a/engine/interfaces/src/main/resources/reference.conf b/engine/interfaces/src/main/resources/reference.conf index b809d1429c..134fa1e509 100644 --- a/engine/interfaces/src/main/resources/reference.conf +++ b/engine/interfaces/src/main/resources/reference.conf @@ -88,7 +88,8 @@ trustedanalytics.atk { fs { # the system will create an "intelanalaytics" folder at this location. # All Trusted Analytics Toolkit files will be stored somewhere under that base location. - root = "hdfs://invalid-fsroot-host/user/atkuser" + //root = "hdfs://invalid-fsroot-host/user/atkuser" + root = "hdfs://paulsimon.hf.intel.com:8020/user/iauser" # Directory to load checkpoints into checkpoint-directory = ${trustedanalytics.atk.engine.fs.root}"/checkpoints" diff --git a/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala b/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala index 49e6249635..70102c378e 100644 --- a/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala +++ b/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala @@ -17,6 +17,7 @@ package org.trustedanalytics.atk.repository import org.apache.commons.dbcp.BasicDataSource +import java.net._ import com.github.tototoshi.slick.GenericJodaSupport import org.trustedanalytics.atk.domain.gc.{ GarbageCollectionEntryTemplate, GarbageCollectionEntry, GarbageCollection, GarbageCollectionTemplate } @@ -1249,8 +1250,11 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { override def updateProgress(id: Long, progress: String)(implicit session: Session): Unit = { val columns = for (c <- jobContextTable if c.id === id) yield (c.progress, c.modifiedOn) - columns.update(Some(progress), new DateTime) - + val now = new DateTime + val localHost = InetAddress.getLocalHost + info(s"JobContextTable.updateProgress (for YarnJobsMonitor) from host $localHost at $now") + columns.update(Some(progress), now) + //columns.update(Some(progress), new DateTime) } override def updateYarnAppName(id: Long, yarnAppName: String)(implicit session: Session): Unit = { From 1c36be64ee8ba3e8e3418871c7c9b79e701fafd1 Mon Sep 17 00:00:00 2001 From: Briton Barker Date: Wed, 25 May 2016 22:06:24 -0700 Subject: [PATCH 2/3] put yarnjobsmonitor timeout details in the exception --- .../engine/command/mgmt/YarnJobsMonitor.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala index 96c1c7ec8e..904052c948 100644 --- a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala +++ b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala @@ -38,9 +38,11 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R while (true) { engine.getCommandsNotComplete().foreach { command => engine.getCommandJobContext(command) match { - case Some(context) => if (hasStaleContext(context)) { - engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting.")) - } + case Some(context) => + val (answer, msg) = hasStaleContext(context) + if (answer) { + engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting. Details: $msg")) + } case None => ; // there is no know YARN job to shutdown (command remains not complete, but this is not the responsibility of a YARN jobs monitor } } @@ -48,12 +50,14 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R } } - def hasStaleContext(context: JobContext): Boolean = { + def hasStaleContext(context: JobContext): (Boolean, String) = { val localHost = InetAddress.getLocalHost val nowMillis = System.currentTimeMillis() val lastModMillis = context.modifiedOn.getMillis - info(s"YarnJobsMonitor hasStaleContext check called $localHost at $nowMillis total ms against context.modified.getMillis=$lastModMillis") + val msg = s"YarnJobsMonitor hasStaleContext check called by $localHost: $nowMillis - $lastModMillis > $timeoutMillis" + info(msg) //System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000 - nowMillis - lastModMillis > timeoutMillis + val answer = nowMillis - lastModMillis > timeoutMillis + (answer, msg) } } From 10a5c45f4bff722d8914643906880cef9c5516fa Mon Sep 17 00:00:00 2001 From: Briton Barker Date: Thu, 26 May 2016 16:42:28 -0700 Subject: [PATCH 3/3] Adding fix candidate to correct for timezones on metastore timestamp updates for JobContext repo --- .../engine/command/mgmt/YarnJobsMonitor.scala | 5 ++++- .../repository/SlickMetaStoreComponent.scala | 19 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala index 904052c948..109fa875fe 100644 --- a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala +++ b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala @@ -18,6 +18,7 @@ package org.trustedanalytics.atk.engine.command.mgmt import java.util.concurrent.TimeUnit import java.net._ +import org.joda.time.{ DateTimeZone, LocalDateTime } import org.trustedanalytics.atk.domain.jobcontext.JobContext import org.trustedanalytics.atk.engine.{ EngineConfig, Engine } import org.trustedanalytics.atk.engine.plugin.Invocation @@ -52,7 +53,9 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R def hasStaleContext(context: JobContext): (Boolean, String) = { val localHost = InetAddress.getLocalHost - val nowMillis = System.currentTimeMillis() + //val nowMillis = System.currentTimeMillis() + val now = new LocalDateTime().toDateTime(DateTimeZone.UTC) + val nowMillis = now.getMillis val lastModMillis = context.modifiedOn.getMillis val msg = s"YarnJobsMonitor hasStaleContext check called by $localHost: $nowMillis - $lastModMillis > $timeoutMillis" info(msg) diff --git a/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala b/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala index 70102c378e..e935d5dad3 100644 --- a/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala +++ b/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala @@ -23,7 +23,7 @@ import com.github.tototoshi.slick.GenericJodaSupport import org.trustedanalytics.atk.domain.gc.{ GarbageCollectionEntryTemplate, GarbageCollectionEntry, GarbageCollection, GarbageCollectionTemplate } import org.trustedanalytics.atk.domain.jobcontext.{ JobContext, JobContextTemplate } import org.trustedanalytics.atk.domain.schema.Schema -import org.joda.time.DateTime +import org.joda.time.{ LocalDateTime, DateTimeZone, DateTime } import scala.slick.driver.JdbcDriver import org.flywaydb.core.Flyway import spray.json._ @@ -59,6 +59,8 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { // Different versions of implicits are imported here based on the driver import genericJodaSupport._ + def getNowTime(): DateTime = new LocalDateTime().toDateTime(DateTimeZone.UTC) + // Defining mappings for custom column types implicit val schemaColumnType = MappedColumnType.base[Schema, String]( { schema => schema.toJson.prettyPrint }, // Schema to String @@ -1231,7 +1233,7 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { } override def insert(jobContext: JobContextTemplate)(implicit session: Session): Try[JobContext] = Try { - val m = JobContext(1, jobContext.userId, None, None, jobContext.clientId, new DateTime(), new DateTime(), None, None) + val m = JobContext(1, jobContext.userId, None, None, jobContext.clientId, getNowTime(), getNowTime(), None, None) jobContextAutoInc.insert(m) } @@ -1245,21 +1247,24 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { override def updateJobServerUri(id: Long, uri: String)(implicit session: Session): Unit = { val columns = for (c <- jobContextTable if c.id === id) yield (c.jobServerUri, c.modifiedOn) - columns.update(Some(uri), new DateTime) + columns.update(Some(uri), getNowTime()) } override def updateProgress(id: Long, progress: String)(implicit session: Session): Unit = { val columns = for (c <- jobContextTable if c.id === id) yield (c.progress, c.modifiedOn) - val now = new DateTime val localHost = InetAddress.getLocalHost - info(s"JobContextTable.updateProgress (for YarnJobsMonitor) from host $localHost at $now") + //val now = new DateTime + val now = getNowTime() + val nowMillis = now.getMillis + info(s"JobContextTable.updateProgress from host $localHost at $now ($nowMillis)") columns.update(Some(progress), now) //columns.update(Some(progress), new DateTime) + } override def updateYarnAppName(id: Long, yarnAppName: String)(implicit session: Session): Unit = { val columns = for (c <- jobContextTable if c.id === id) yield (c.yarnAppName, c.modifiedOn) - columns.update(Some(yarnAppName), new DateTime) + columns.update(Some(yarnAppName), getNowTime) } override def scan(offset: Int = 0, count: Int = defaultScanCount)(implicit session: Session): Seq[JobContext] = { @@ -1282,7 +1287,7 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { } override def lookupRecentlyActive(seconds: Int)(implicit session: Session): Seq[JobContext] = { - val recentTime = (new DateTime).minusSeconds(seconds) + val recentTime = (getNowTime()).minusSeconds(seconds) jobContextTable.where(_.modifiedOn >= recentTime).list }