From 2f581a01b91744ce62506249af498b6ecd53363f Mon Sep 17 00:00:00 2001
From: mengw15 <125719918+mengw15@users.noreply.github.com>
Date: Mon, 20 Jan 2025 18:09:41 -0800
Subject: [PATCH 01/52] first version
---
.../common/ExecutorDeployment.scala | 34 ++++++++++++-------
.../amber/engine/common/AmberRuntime.scala | 19 +++++++----
.../core/workflow/LocationPreference.scala | 2 ++
.../operator/distinct/DistinctOpDesc.scala | 29 ++++++++++++----
.../filter/SpecializedFilterOpDesc.scala | 29 ++++++++++++----
.../keywordSearch/KeywordSearchOpDesc.scala | 32 ++++++++++++-----
6 files changed, 105 insertions(+), 40 deletions(-)
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala
index e0a9d53eacd..547af38af96 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/common/ExecutorDeployment.scala
@@ -2,29 +2,25 @@ package edu.uci.ics.amber.engine.architecture.common
import akka.actor.{Address, Deploy}
import akka.remote.RemoteScope
-import edu.uci.ics.amber.core.workflow.{PhysicalOp, PreferController, RoundRobinPreference}
+import edu.uci.ics.amber.core.workflow.{PhysicalOp, PreferController, RoundRobinPreference, GoToSpecificNode}
import edu.uci.ics.amber.engine.architecture.controller.execution.OperatorExecution
import edu.uci.ics.amber.engine.architecture.deploysemantics.AddressInfo
import edu.uci.ics.amber.engine.architecture.pythonworker.PythonWorkflowWorker
import edu.uci.ics.amber.engine.architecture.scheduling.config.OperatorConfig
import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker
-import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{
- FaultToleranceConfig,
- StateRestoreConfig,
- WorkerReplayInitialization
-}
+import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{FaultToleranceConfig, StateRestoreConfig, WorkerReplayInitialization}
import edu.uci.ics.amber.util.VirtualIdentityUtils
object ExecutorDeployment {
def createWorkers(
- op: PhysicalOp,
- controllerActorService: AkkaActorService,
- operatorExecution: OperatorExecution,
- operatorConfig: OperatorConfig,
- stateRestoreConfig: Option[StateRestoreConfig],
- replayLoggingConfig: Option[FaultToleranceConfig]
- ): Unit = {
+ op: PhysicalOp,
+ controllerActorService: AkkaActorService,
+ operatorExecution: OperatorExecution,
+ operatorConfig: OperatorConfig,
+ stateRestoreConfig: Option[StateRestoreConfig],
+ replayLoggingConfig: Option[FaultToleranceConfig]
+ ): Unit = {
val addressInfo = AddressInfo(
controllerActorService.getClusterNodeAddresses,
@@ -38,7 +34,19 @@ object ExecutorDeployment {
val preferredAddress: Address = locationPreference match {
case PreferController =>
addressInfo.controllerAddress
+ case node: GoToSpecificNode =>
+ println("---------++")
+ println("worker id: " + workerId)
+ println("worker index: " + workerIndex)
+ addressInfo.allAddresses.foreach(address => println("Address: " + address + "Address host: " + address.host.get))
+ println("---------++")
+ addressInfo.allAddresses.find(addr => addr.host.get == node.nodeAddr).get
case RoundRobinPreference =>
+ println("+++++")
+ println("worker id: " + workerId)
+ println("worker index: " + workerIndex)
+ addressInfo.allAddresses.foreach(address => println("Address: " + address + "Address host: " + address.host.get))
+ println("+++++")
assert(
addressInfo.allAddresses.nonEmpty,
"Execution failed to start, no available computation nodes"
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
index e21a9eb0c09..36d5a8f0a85 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
@@ -8,7 +8,7 @@ import edu.uci.ics.amber.clustering.ClusterListener
import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor
import java.io.{BufferedReader, InputStreamReader}
-import java.net.URL
+import java.net.{InetAddress, URL}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
@@ -26,7 +26,7 @@ object AmberRuntime {
}
def scheduleRecurringCallThroughActorSystem(initialDelay: FiniteDuration, delay: FiniteDuration)(
- call: => Unit
+ call: => Unit
): Cancellable = {
_actorSystem.scheduler.scheduleWithFixedDelay(initialDelay, delay)(() => call)
}
@@ -35,7 +35,8 @@ object AmberRuntime {
try {
val query = new URL("http://checkip.amazonaws.com")
val in = new BufferedReader(new InputStreamReader(query.openStream()))
- in.readLine()
+ // in.readLine()
+ InetAddress.getLocalHost().getHostAddress()
} catch {
case e: Exception => throw e
}
@@ -50,8 +51,10 @@ object AmberRuntime {
val masterConfig = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.port = 2552
- akka.remote.artery.canonical.hostname = $localIpAddress
- akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
+ akka.remote.artery.canonical.hostname = "44.218.250.138"
+ akka.remote.artery.bind.hostname = $localIpAddress
+ akka.remote.artery.bind.port = 2552
+ akka.cluster.seed-nodes = [ "akka://Amber@44.218.250.138:2552" ]
""")
.withFallback(akkaConfig)
AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
@@ -70,9 +73,11 @@ object AmberRuntime {
}
val workerConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.hostname = $localIpAddress
+ akka.remote.artery.canonical.hostname = "44.218.250.138"
akka.remote.artery.canonical.port = 0
- akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
+ akka.remote.artery.bind.hostname = $localIpAddress
+ akka.remote.artery.bind.port = 0
+ akka.cluster.seed-nodes = [ "akka://Amber@44.218.250.138:2552" ]
""")
.withFallback(akkaConfig)
AmberConfig.masterNodeAddr = createMasterAddress(addr)
diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/LocationPreference.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/LocationPreference.scala
index 40372b93dd6..ba7c77aa965 100644
--- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/LocationPreference.scala
+++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/LocationPreference.scala
@@ -12,3 +12,5 @@ object PreferController extends LocationPreference
// - Operator A: Worker 1 -> Node 1, Worker 2 -> Node 2, Worker 3 -> Node 3
// - Operator B: Worker 1 -> Node 1, Worker 2 -> Node 2
object RoundRobinPreference extends LocationPreference
+
+case class GoToSpecificNode(nodeAddr:String) extends LocationPreference
diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/distinct/DistinctOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/distinct/DistinctOpDesc.scala
index 7f851743b18..1d70db53bb0 100644
--- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/distinct/DistinctOpDesc.scala
+++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/distinct/DistinctOpDesc.scala
@@ -1,18 +1,31 @@
package edu.uci.ics.amber.operator.distinct
+import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
+import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle}
import edu.uci.ics.amber.core.executor.OpExecWithClassName
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
-import edu.uci.ics.amber.core.workflow.{HashPartition, InputPort, OutputPort, PhysicalOp}
+import edu.uci.ics.amber.core.workflow.{GoToSpecificNode, HashPartition, InputPort, OutputPort, PhysicalOp}
import edu.uci.ics.amber.operator.LogicalOp
+import edu.uci.ics.amber.operator.metadata.annotations.UIWidget
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
class DistinctOpDesc extends LogicalOp {
+ @JsonProperty(required = false)
+ @JsonSchemaTitle("nodeAddr")
+ @JsonSchemaInject(json = UIWidget.UIWidgetTextArea)
+ var nodeAddr: String = _
+
+ @JsonProperty(defaultValue = "true")
+ @JsonSchemaTitle("location preference(default)")
+ @JsonPropertyDescription("Whether use default RoundRobinPreference")
+ var UseRoundRobin: Boolean = true
+
override def getPhysicalOp(
- workflowId: WorkflowIdentity,
- executionId: ExecutionIdentity
- ): PhysicalOp = {
- PhysicalOp
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = {
+ val baseOp = PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
@@ -23,7 +36,11 @@ class DistinctOpDesc extends LogicalOp {
.withOutputPorts(operatorInfo.outputPorts)
.withPartitionRequirement(List(Option(HashPartition())))
.withDerivePartition(_ => HashPartition())
-
+ if (!UseRoundRobin) {
+ baseOp.withLocationPreference(Some(GoToSpecificNode(nodeAddr))) // Use the actual `nodeAddr`
+ } else {
+ baseOp // Return without modifying location preference
+ }
}
override def operatorInfo: OperatorInfo =
diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.scala
index 340e2ee8b48..75c21def9e0 100644
--- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.scala
+++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.scala
@@ -1,12 +1,13 @@
package edu.uci.ics.amber.operator.filter
import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
+import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle}
import edu.uci.ics.amber.core.executor.OpExecWithClassName
-import edu.uci.ics.amber.core.workflow.PhysicalOp
+import edu.uci.ics.amber.core.workflow.{GoToSpecificNode, InputPort, OutputPort, PhysicalOp}
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import edu.uci.ics.amber.util.JSONUtils.objectMapper
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
-import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort}
+import edu.uci.ics.amber.operator.metadata.annotations.UIWidget
class SpecializedFilterOpDesc extends FilterOpDesc {
@@ -14,11 +15,21 @@ class SpecializedFilterOpDesc extends FilterOpDesc {
@JsonPropertyDescription("multiple predicates in OR")
var predicates: List[FilterPredicate] = List.empty
+ @JsonProperty(required = false)
+ @JsonSchemaTitle("nodeAddr")
+ @JsonSchemaInject(json = UIWidget.UIWidgetTextArea)
+ var nodeAddr: String = _
+
+ @JsonProperty(defaultValue = "true")
+ @JsonSchemaTitle("location preference(default)")
+ @JsonPropertyDescription("Whether use default RoundRobinPreference")
+ var UseRoundRobin: Boolean = true
+
override def getPhysicalOp(
- workflowId: WorkflowIdentity,
- executionId: ExecutionIdentity
- ): PhysicalOp = {
- PhysicalOp
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = {
+ val baseOp = PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
@@ -30,6 +41,12 @@ class SpecializedFilterOpDesc extends FilterOpDesc {
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
+
+ if (!UseRoundRobin) {
+ baseOp.withLocationPreference(Some(GoToSpecificNode(nodeAddr))) // Use the actual `nodeAddr`
+ } else {
+ baseOp // Return without modifying location preference
+ }
}
override def operatorInfo: OperatorInfo = {
diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/keywordSearch/KeywordSearchOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/keywordSearch/KeywordSearchOpDesc.scala
index b3e41f267e6..30773288ad2 100644
--- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/keywordSearch/KeywordSearchOpDesc.scala
+++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/keywordSearch/KeywordSearchOpDesc.scala
@@ -1,15 +1,14 @@
package edu.uci.ics.amber.operator.keywordSearch
import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
-import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle}
import edu.uci.ics.amber.core.executor.OpExecWithClassName
-import edu.uci.ics.amber.core.workflow.PhysicalOp
+import edu.uci.ics.amber.core.workflow.{GoToSpecificNode, InputPort, OutputPort, PhysicalOp}
import edu.uci.ics.amber.operator.filter.FilterOpDesc
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
-import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName
+import edu.uci.ics.amber.operator.metadata.annotations.{AutofillAttributeName, UIWidget}
import edu.uci.ics.amber.util.JSONUtils.objectMapper
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
-import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort}
class KeywordSearchOpDesc extends FilterOpDesc {
@@ -24,11 +23,22 @@ class KeywordSearchOpDesc extends FilterOpDesc {
@JsonPropertyDescription("keywords")
var keyword: String = _
+ @JsonProperty(required = false)
+ @JsonSchemaTitle("nodeAddr")
+ @JsonSchemaInject(json = UIWidget.UIWidgetTextArea)
+ var nodeAddr: String = _
+
+ @JsonProperty(defaultValue = "true")
+ @JsonSchemaTitle("location preference(default)")
+ @JsonPropertyDescription("Whether use default RoundRobinPreference")
+ var UseRoundRobin: Boolean = true
+
+
override def getPhysicalOp(
- workflowId: WorkflowIdentity,
- executionId: ExecutionIdentity
- ): PhysicalOp = {
- PhysicalOp
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = {
+ val baseOp = PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
@@ -40,6 +50,12 @@ class KeywordSearchOpDesc extends FilterOpDesc {
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
+
+ if (!UseRoundRobin) {
+ baseOp.withLocationPreference(Some(GoToSpecificNode(nodeAddr))) // Use the actual `nodeAddr`
+ } else {
+ baseOp // Return without modifying location preference
+ }
}
override def operatorInfo: OperatorInfo =
From 046edea46dd0b786b591290af467a9f29ecd6b11 Mon Sep 17 00:00:00 2001
From: mengw15 <125719918+mengw15@users.noreply.github.com>
Date: Sun, 2 Feb 2025 20:31:37 -0800
Subject: [PATCH 02/52] finish amberruntime
---
.../amber/engine/common/AmberRuntime.scala | 75 ++++++++++++++-----
1 file changed, 56 insertions(+), 19 deletions(-)
diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
index 36d5a8f0a85..810c4fa8470 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
@@ -35,8 +35,14 @@ object AmberRuntime {
try {
val query = new URL("http://checkip.amazonaws.com")
val in = new BufferedReader(new InputStreamReader(query.openStream()))
- // in.readLine()
- InetAddress.getLocalHost().getHostAddress()
+ val ip = in.readLine()
+ val localIp = InetAddress.getLocalHost().getHostAddress()
+ println(s"public++++++++++in.readLine(): $ip") // 打印 IP
+ println(s"private——————————InetAddress.getLocalHost().getHostAddress(): $localIp") // 打印 IP
+ //打印public++++++++++in.readLine(): 35.84.255.218
+ //private——————————InetAddress.getLocalHost().getHostAddress(): 172.31.31.180
+ ip
+ // InetAddress.getLocalHost().getHostAddress()
} catch {
case e: Exception => throw e
}
@@ -46,19 +52,33 @@ object AmberRuntime {
var localIpAddress = "localhost"
if (clusterMode) {
localIpAddress = getNodeIpAddress
- }
- val masterConfig = ConfigFactory
- .parseString(s"""
+ val localPrivateIdAddress = InetAddress.getLocalHost().getHostAddress()
+
+ val masterConfig = ConfigFactory
+ .parseString(s"""
+ akka.remote.artery.canonical.port = 2552
+ akka.remote.artery.canonical.hostname = $localIpAddress
+ akka.remote.artery.bind.hostname = $localPrivateIdAddress
+ akka.remote.artery.bind.port = 2552
+ akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
+ """)
+ .withFallback(akkaConfig)
+ AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
+ createAmberSystem(masterConfig)
+ }
+ else{
+ val masterConfig = ConfigFactory
+ .parseString(s"""
akka.remote.artery.canonical.port = 2552
- akka.remote.artery.canonical.hostname = "44.218.250.138"
- akka.remote.artery.bind.hostname = $localIpAddress
- akka.remote.artery.bind.port = 2552
- akka.cluster.seed-nodes = [ "akka://Amber@44.218.250.138:2552" ]
+ akka.remote.artery.canonical.hostname = $localIpAddress
+ akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
""")
- .withFallback(akkaConfig)
- AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
- createAmberSystem(masterConfig)
+ .withFallback(akkaConfig)
+ AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
+ createAmberSystem(masterConfig)
+
+ }
}
def akkaConfig: Config = ConfigFactory.load("cluster").withFallback(defaultApplication())
@@ -69,19 +89,36 @@ object AmberRuntime {
val addr = mainNodeAddress.getOrElse("localhost")
var localIpAddress = "localhost"
if (mainNodeAddress.isDefined) {
+
+ println(s"main++++++++++NodeAddress: $mainNodeAddress")
+ //打印main++++++++++NodeAddress: Some(35.84.255.218)
+
localIpAddress = getNodeIpAddress
+
+ val localPrivateIdAddress = InetAddress.getLocalHost().getHostAddress()
+
+ val workerConfig = ConfigFactory
+ .parseString(s"""
+ akka.remote.artery.canonical.hostname = $localIpAddress
+ akka.remote.artery.canonical.port = 0
+ akka.remote.artery.bind.hostname = $localPrivateIdAddress
+ akka.remote.artery.bind.port = 0
+ akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
+ """)
+ .withFallback(akkaConfig)
+ AmberConfig.masterNodeAddr = createMasterAddress(addr)
+ createAmberSystem(workerConfig)
}
- val workerConfig = ConfigFactory
+
+ else{val workerConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.hostname = "44.218.250.138"
+ akka.remote.artery.canonical.hostname = $localIpAddress
akka.remote.artery.canonical.port = 0
- akka.remote.artery.bind.hostname = $localIpAddress
- akka.remote.artery.bind.port = 0
- akka.cluster.seed-nodes = [ "akka://Amber@44.218.250.138:2552" ]
+ akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
""")
.withFallback(akkaConfig)
- AmberConfig.masterNodeAddr = createMasterAddress(addr)
- createAmberSystem(workerConfig)
+ AmberConfig.masterNodeAddr = createMasterAddress(addr)
+ createAmberSystem(workerConfig)}
}
private def createAmberSystem(actorSystemConf: Config): Unit = {
From 70f55b354f15f4b1c457104a2fcc452875ba4416 Mon Sep 17 00:00:00 2001
From: mengw15 <125719918+mengw15@users.noreply.github.com>
Date: Mon, 3 Feb 2025 13:10:42 -0800
Subject: [PATCH 03/52] make advance attribute show in different tab
---
...perator-property-edit-frame.component.html | 76 ++++++++++++++-----
.../operator-property-edit-frame.component.ts | 22 +++++-
2 files changed, 80 insertions(+), 18 deletions(-)
diff --git a/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html b/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html
index ea2cd19131e..f53af9f9b07 100644
--- a/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html
+++ b/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.html
@@ -68,23 +68,65 @@
(focusout)="disconnectQuillFromText()"
(keyup.enter)="disconnectQuillFromText()">
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 0">
+
+
+
+
+ 0">
+
+
+
+