diff --git a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala index 430f37b9c3..109fa875fe 100644 --- a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala +++ b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala @@ -16,7 +16,9 @@ package org.trustedanalytics.atk.engine.command.mgmt import java.util.concurrent.TimeUnit +import java.net._ +import org.joda.time.{ DateTimeZone, LocalDateTime } import org.trustedanalytics.atk.domain.jobcontext.JobContext import org.trustedanalytics.atk.engine.{ EngineConfig, Engine } import org.trustedanalytics.atk.engine.plugin.Invocation @@ -28,15 +30,20 @@ import org.trustedanalytics.atk.event.EventLogging class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends Runnable with EventLogging { lazy val timeoutMinutes: Long = EngineConfig.yarnMonitorTaskTimeout + lazy val timeoutMillis: Long = timeoutMinutes * 60 * 1000 def run(): Unit = { - info(s"YarnJobsMonitor started. Task timeout is $timeoutMinutes minutes.") + val localHost = InetAddress.getLocalHost + val nowMillis = System.currentTimeMillis() + info(s"YarnJobsMonitor started on $localHost at $nowMillis total ms. Task timeout is $timeoutMinutes minutes (or $timeoutMillis ms).") while (true) { engine.getCommandsNotComplete().foreach { command => engine.getCommandJobContext(command) match { - case Some(context) => if (hasStaleContext(context)) { - engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting.")) - } + case Some(context) => + val (answer, msg) = hasStaleContext(context) + if (answer) { + engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting. Details: $msg")) + } case None => ; // there is no know YARN job to shutdown (command remains not complete, but this is not the responsibility of a YARN jobs monitor } } @@ -44,6 +51,16 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R } } - def hasStaleContext(context: JobContext): Boolean = - System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000 + def hasStaleContext(context: JobContext): (Boolean, String) = { + val localHost = InetAddress.getLocalHost + //val nowMillis = System.currentTimeMillis() + val now = new LocalDateTime().toDateTime(DateTimeZone.UTC) + val nowMillis = now.getMillis + val lastModMillis = context.modifiedOn.getMillis + val msg = s"YarnJobsMonitor hasStaleContext check called by $localHost: $nowMillis - $lastModMillis > $timeoutMillis" + info(msg) + //System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000 + val answer = nowMillis - lastModMillis > timeoutMillis + (answer, msg) + } } diff --git a/engine/interfaces/src/main/resources/reference.conf b/engine/interfaces/src/main/resources/reference.conf index b809d1429c..134fa1e509 100644 --- a/engine/interfaces/src/main/resources/reference.conf +++ b/engine/interfaces/src/main/resources/reference.conf @@ -88,7 +88,8 @@ trustedanalytics.atk { fs { # the system will create an "intelanalaytics" folder at this location. # All Trusted Analytics Toolkit files will be stored somewhere under that base location. - root = "hdfs://invalid-fsroot-host/user/atkuser" + //root = "hdfs://invalid-fsroot-host/user/atkuser" + root = "hdfs://paulsimon.hf.intel.com:8020/user/iauser" # Directory to load checkpoints into checkpoint-directory = ${trustedanalytics.atk.engine.fs.root}"/checkpoints" diff --git a/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala b/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala index 49e6249635..e935d5dad3 100644 --- a/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala +++ b/engine/meta-store/src/main/scala/org/trustedanalytics/atk/repository/SlickMetaStoreComponent.scala @@ -17,12 +17,13 @@ package org.trustedanalytics.atk.repository import org.apache.commons.dbcp.BasicDataSource +import java.net._ import com.github.tototoshi.slick.GenericJodaSupport import org.trustedanalytics.atk.domain.gc.{ GarbageCollectionEntryTemplate, GarbageCollectionEntry, GarbageCollection, GarbageCollectionTemplate } import org.trustedanalytics.atk.domain.jobcontext.{ JobContext, JobContextTemplate } import org.trustedanalytics.atk.domain.schema.Schema -import org.joda.time.DateTime +import org.joda.time.{ LocalDateTime, DateTimeZone, DateTime } import scala.slick.driver.JdbcDriver import org.flywaydb.core.Flyway import spray.json._ @@ -58,6 +59,8 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { // Different versions of implicits are imported here based on the driver import genericJodaSupport._ + def getNowTime(): DateTime = new LocalDateTime().toDateTime(DateTimeZone.UTC) + // Defining mappings for custom column types implicit val schemaColumnType = MappedColumnType.base[Schema, String]( { schema => schema.toJson.prettyPrint }, // Schema to String @@ -1230,7 +1233,7 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { } override def insert(jobContext: JobContextTemplate)(implicit session: Session): Try[JobContext] = Try { - val m = JobContext(1, jobContext.userId, None, None, jobContext.clientId, new DateTime(), new DateTime(), None, None) + val m = JobContext(1, jobContext.userId, None, None, jobContext.clientId, getNowTime(), getNowTime(), None, None) jobContextAutoInc.insert(m) } @@ -1244,18 +1247,24 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { override def updateJobServerUri(id: Long, uri: String)(implicit session: Session): Unit = { val columns = for (c <- jobContextTable if c.id === id) yield (c.jobServerUri, c.modifiedOn) - columns.update(Some(uri), new DateTime) + columns.update(Some(uri), getNowTime()) } override def updateProgress(id: Long, progress: String)(implicit session: Session): Unit = { val columns = for (c <- jobContextTable if c.id === id) yield (c.progress, c.modifiedOn) - columns.update(Some(progress), new DateTime) + val localHost = InetAddress.getLocalHost + //val now = new DateTime + val now = getNowTime() + val nowMillis = now.getMillis + info(s"JobContextTable.updateProgress from host $localHost at $now ($nowMillis)") + columns.update(Some(progress), now) + //columns.update(Some(progress), new DateTime) } override def updateYarnAppName(id: Long, yarnAppName: String)(implicit session: Session): Unit = { val columns = for (c <- jobContextTable if c.id === id) yield (c.yarnAppName, c.modifiedOn) - columns.update(Some(yarnAppName), new DateTime) + columns.update(Some(yarnAppName), getNowTime) } override def scan(offset: Int = 0, count: Int = defaultScanCount)(implicit session: Session): Seq[JobContext] = { @@ -1278,7 +1287,7 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging { } override def lookupRecentlyActive(seconds: Int)(implicit session: Session): Seq[JobContext] = { - val recentTime = (new DateTime).minusSeconds(seconds) + val recentTime = (getNowTime()).minusSeconds(seconds) jobContextTable.where(_.modifiedOn >= recentTime).list }