diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/clustering/ClusterListener.scala b/core/amber/src/main/scala/edu/uci/ics/amber/clustering/ClusterListener.scala index 9c639946d3c..f60c0c09dc7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/clustering/ClusterListener.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/clustering/ClusterListener.scala @@ -27,6 +27,8 @@ object ClusterListener { final case class GetAvailableNodeAddresses() var numWorkerNodesInCluster = 0 + + var currentAddresses: Set[Address] = Set.empty } class ClusterListener extends Actor with AmberLogging { @@ -117,9 +119,13 @@ class ClusterListener extends Actor with AmberLogging { case other => //skip } + ClusterListener.currentAddresses = getAllAddress.toSet numWorkerNodesInCluster = getAllAddress.size + + val addressesStr = ClusterListener.currentAddresses.map(_.toString).toSeq + SessionState.getAllSessionStates.foreach { state => - state.send(ClusterStatusUpdateEvent(numWorkerNodesInCluster)) + state.send(ClusterStatusUpdateEvent(numWorkerNodesInCluster, addressesStr)) } logger.info( diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala index e0a9d53eacd..f76c4dcded3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala @@ -1,8 +1,13 @@ package edu.uci.ics.amber.engine.architecture.common -import akka.actor.{Address, Deploy} +import akka.actor.{Address, AddressFromURIString, Deploy} import akka.remote.RemoteScope -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PreferController, RoundRobinPreference} +import edu.uci.ics.amber.core.workflow.{ + GoToSpecificNode, + PhysicalOp, + PreferController, + RoundRobinPreference +} import edu.uci.ics.amber.engine.architecture.controller.execution.OperatorExecution import edu.uci.ics.amber.engine.architecture.deploysemantics.AddressInfo import edu.uci.ics.amber.engine.architecture.pythonworker.PythonWorkflowWorker @@ -38,6 +43,16 @@ object ExecutorDeployment { val preferredAddress: Address = locationPreference match { case PreferController => addressInfo.controllerAddress + case node: GoToSpecificNode => + val targetAddress = AddressFromURIString(node.nodeAddr) + addressInfo.allAddresses.find(addr => addr == targetAddress) match { + case Some(address) => address + case None => + throw new IllegalStateException( + s"Designated node address '${node.nodeAddr}' not found among available addresses: " + + addressInfo.allAddresses.map(_.host.getOrElse("None")).mkString(", ") + ) + } case RoundRobinPreference => assert( addressInfo.allAddresses.nonEmpty, diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala index 8b47e3ead99..671c07ab601 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala @@ -8,7 +8,7 @@ import edu.uci.ics.amber.clustering.ClusterListener import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor import java.io.{BufferedReader, InputStreamReader} -import java.net.URL +import java.net.{InetAddress, URL} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.FiniteDuration @@ -46,7 +46,9 @@ object AmberRuntime { try { val query = new URL("http://checkip.amazonaws.com") val in = new BufferedReader(new InputStreamReader(query.openStream())) - in.readLine() + val ip = in.readLine() + val localIp = InetAddress.getLocalHost().getHostAddress() + ip } catch { case e: Exception => throw e } @@ -56,18 +58,33 @@ object AmberRuntime { var localIpAddress = "localhost" if (clusterMode) { localIpAddress = getNodeIpAddress - } - val masterConfig = ConfigFactory - .parseString(s""" + val localPrivateIdAddress = InetAddress.getLocalHost().getHostAddress() + + val masterConfig = ConfigFactory + .parseString(s""" + akka.remote.artery.canonical.port = 2552 + akka.remote.artery.canonical.hostname = $localIpAddress + akka.remote.artery.bind.hostname = $localPrivateIdAddress + akka.remote.artery.bind.port = 2552 + akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ] + """) + .withFallback(akkaConfig) + .resolve() + AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress) + createAmberSystem(masterConfig) + } else { + val masterConfig = ConfigFactory + .parseString(s""" akka.remote.artery.canonical.port = 2552 akka.remote.artery.canonical.hostname = $localIpAddress akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ] """) - .withFallback(akkaConfig) - .resolve() - AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress) - createAmberSystem(masterConfig) + .withFallback(akkaConfig) + .resolve() + AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress) + createAmberSystem(masterConfig) + } } def akkaConfig: Config = @@ -79,18 +96,35 @@ object AmberRuntime { val addr = mainNodeAddress.getOrElse("localhost") var localIpAddress = "localhost" if (mainNodeAddress.isDefined) { + localIpAddress = getNodeIpAddress - } - val workerConfig = ConfigFactory - .parseString(s""" + + val localPrivateIdAddress = InetAddress.getLocalHost().getHostAddress() + + val workerConfig = ConfigFactory + .parseString(s""" + akka.remote.artery.canonical.hostname = $localIpAddress + akka.remote.artery.canonical.port = 0 + akka.remote.artery.bind.hostname = $localPrivateIdAddress + akka.remote.artery.bind.port = 0 + akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ] + """) + .withFallback(akkaConfig) + .resolve() + AmberConfig.masterNodeAddr = createMasterAddress(addr) + createAmberSystem(workerConfig) + } else { + val workerConfig = ConfigFactory + .parseString(s""" akka.remote.artery.canonical.hostname = $localIpAddress akka.remote.artery.canonical.port = 0 akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ] """) - .withFallback(akkaConfig) - .resolve() - AmberConfig.masterNodeAddr = createMasterAddress(addr) - createAmberSystem(workerConfig) + .withFallback(akkaConfig) + .resolve() + AmberConfig.masterNodeAddr = createMasterAddress(addr) + createAmberSystem(workerConfig) + } } private def createAmberSystem(actorSystemConf: Config): Unit = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/response/ClusterStatusUpdateEvent.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/response/ClusterStatusUpdateEvent.scala index bda2d246f4c..6dc7623058e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/response/ClusterStatusUpdateEvent.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/response/ClusterStatusUpdateEvent.scala @@ -2,4 +2,5 @@ package edu.uci.ics.texera.web.model.websocket.response import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent -case class ClusterStatusUpdateEvent(numWorkers: Int) extends TexeraWebSocketEvent +case class ClusterStatusUpdateEvent(numWorkers: Int, addresses: Seq[String] = Seq.empty) + extends TexeraWebSocketEvent diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala index 864f84784f0..12b308b02c1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala @@ -36,7 +36,10 @@ class WorkflowWebsocketResource extends LazyLogging { val workflowState = WorkflowService.getOrCreate(WorkflowIdentity(wid)) sessionState.subscribe(workflowState) - sessionState.send(ClusterStatusUpdateEvent(ClusterListener.numWorkerNodesInCluster)) + val addressesStr = ClusterListener.currentAddresses.map(_.toString).toSeq + sessionState.send( + ClusterStatusUpdateEvent(ClusterListener.numWorkerNodesInCluster, addressesStr) + ) logger.info("connection open") } diff --git a/core/gui/src/app/app.module.ts b/core/gui/src/app/app.module.ts index ab3e5c29ddb..64c46191f8e 100644 --- a/core/gui/src/app/app.module.ts +++ b/core/gui/src/app/app.module.ts @@ -81,6 +81,7 @@ import { AdminGuardService } from "./dashboard/service/admin/guard/admin-guard.s import { ContextMenuComponent } from "./workspace/component/workflow-editor/context-menu/context-menu/context-menu.component"; import { CoeditorUserIconComponent } from "./workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component"; import { InputAutoCompleteComponent } from "./workspace/component/input-autocomplete/input-autocomplete.component"; +import { InputNodeAddressComponent } from "./workspace/component/input-node-address/input-node-address.component"; import { CollabWrapperComponent } from "./common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component"; import { NzSwitchModule } from "ng-zorro-antd/switch"; import { AboutComponent } from "./hub/component/about/about.component"; @@ -214,6 +215,7 @@ registerLocaleData(en); ContextMenuComponent, CoeditorUserIconComponent, InputAutoCompleteComponent, + InputNodeAddressComponent, FileSelectionComponent, CollabWrapperComponent, AboutComponent, diff --git a/core/gui/src/app/common/formly/formly-config.ts b/core/gui/src/app/common/formly/formly-config.ts index bbd4be968eb..27f62c79ef0 100644 --- a/core/gui/src/app/common/formly/formly-config.ts +++ b/core/gui/src/app/common/formly/formly-config.ts @@ -7,6 +7,7 @@ import { CodeareaCustomTemplateComponent } from "../../workspace/component/codea import { PresetWrapperComponent } from "./preset-wrapper/preset-wrapper.component"; import { InputAutoCompleteComponent } from "../../workspace/component/input-autocomplete/input-autocomplete.component"; import { CollabWrapperComponent } from "./collab-wrapper/collab-wrapper/collab-wrapper.component"; +import { InputNodeAddressComponent } from "../../workspace/component/input-node-address/input-node-address.component"; /** * Configuration for using Json Schema with Formly. @@ -57,6 +58,7 @@ export const TEXERA_FORMLY_CONFIG = { { name: "multischema", component: MultiSchemaTypeComponent }, { name: "codearea", component: CodeareaCustomTemplateComponent }, { name: "inputautocomplete", component: InputAutoCompleteComponent, wrappers: ["form-field"] }, + { name: "inputnodeaddress", component: InputNodeAddressComponent, wrappers: ["form-field"] }, ], wrappers: [ { name: "preset-wrapper", component: PresetWrapperComponent }, diff --git a/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.html b/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.html new file mode 100644 index 00000000000..810254385c8 --- /dev/null +++ b/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.html @@ -0,0 +1,10 @@ + + + + diff --git a/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.scss b/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.scss new file mode 100644 index 00000000000..003f7e1a20f --- /dev/null +++ b/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.scss @@ -0,0 +1,7 @@ +:host { + display: block; +} + +nz-select { + width: 100%; +} diff --git a/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.ts b/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.ts new file mode 100644 index 00000000000..c8f6ec7bde5 --- /dev/null +++ b/core/gui/src/app/workspace/component/input-node-address/input-node-address.component.ts @@ -0,0 +1,40 @@ +import { Component, OnInit, OnDestroy } from "@angular/core"; +import { FieldType, FieldTypeConfig } from "@ngx-formly/core"; +import { Subject, Subscription } from "rxjs"; +import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service"; +import { takeUntil } from "rxjs/operators"; + +@Component({ + selector: "texera-input-node-address", + templateUrl: "./input-node-address.component.html", + styleUrls: ["./input-node-address.component.scss"], +}) +export class InputNodeAddressComponent extends FieldType implements OnInit, OnDestroy { + public nodeAddresses: string[] = []; + + private wsSubscription!: Subscription; + + private componentDestroy = new Subject(); + + constructor(private workflowWebsocketService: WorkflowWebsocketService) { + super(); + } + + ngOnInit(): void { + this.nodeAddresses = this.workflowWebsocketService.workerAddresses; + + this.wsSubscription = this.workflowWebsocketService + .websocketEvent() + .pipe(takeUntil(this.componentDestroy)) + .subscribe(event => { + if (event.type === "ClusterStatusUpdateEvent") { + this.nodeAddresses = event.addresses; + } + }); + } + + ngOnDestroy(): void { + this.componentDestroy.next(); + this.componentDestroy.complete(); + } +} diff --git a/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html b/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html index 4ddd0a56809..831a06abf0c 100644 --- a/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html +++ b/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html @@ -69,23 +69,53 @@ (focusout)="disconnectQuillFromText()" (keyup.enter)="disconnectQuillFromText()"> -
- - - -
+ + +
+ + + + +
+
+ + +
+ + + + +
+
+