Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
2f581a0
first version
mengw15 Jan 21, 2025
046edea
finish amberruntime
mengw15 Feb 3, 2025
70f55b3
make advance attribute show in different tab
mengw15 Feb 3, 2025
3745771
add a comment
mengw15 Feb 3, 2025
6ec182a
first version
mengw15 Feb 10, 2025
e1c0bd8
first version 2
mengw15 Feb 10, 2025
ca66592
revert
mengw15 Feb 10, 2025
a931f49
1
mengw15 Feb 10, 2025
b82bb6f
basicFields和advancedFields,validate问题
mengw15 Feb 18, 2025
b5a3215
change Manual Location to Auto Select Node Address
mengw15 Feb 18, 2025
1ad3301
fix
mengw15 Feb 18, 2025
c20b945
fix
mengw15 Feb 18, 2025
e3e6c0c
update aggregate operator
mengw15 Feb 18, 2025
e28f42f
update CartesianProduct, DictionartMatcher, Difference
mengw15 Feb 18, 2025
73c9939
update distinct, specilizedFilter
mengw15 Feb 18, 2025
3b83d54
update hashJoinOp
mengw15 Feb 18, 2025
f94fe2a
update pythonOperatorDescriptor
mengw15 Feb 18, 2025
af0ac85
update ifOp, intersectOp, intervalJoinOp
mengw15 Feb 18, 2025
be329a5
update keywordsearch, limit
mengw15 Feb 18, 2025
791c6a0
update projectionOp, randomKSamplingOp, RegexOp
mengw15 Feb 18, 2025
57336f8
update ReservoirSampling, SentimentAnalysis
mengw15 Feb 18, 2025
f7e51fb
update sortPartitionsOp, SplitOp, SymmetricDifferenceOp
mengw15 Feb 18, 2025
f06a06a
update typeCastingOp, JavaUDFOp
mengw15 Feb 18, 2025
fbc2b21
update DualInputPortsPythonUDFOp
mengw15 Feb 18, 2025
06e2c24
update PythonUDFOp
mengw15 Feb 18, 2025
9b0fcb6
update RUDFOp
mengw15 Feb 18, 2025
4c54322
update UnionOp, UnnestStringOp
mengw15 Feb 18, 2025
1f6ffcb
update HtmlVizOp
mengw15 Feb 18, 2025
736929b
update UrlVizOp
mengw15 Feb 18, 2025
9911ca3
backend fmt fix
mengw15 Feb 18, 2025
03c1af9
frontend fmt fix
mengw15 Feb 18, 2025
b56a1c4
Merge branch 'master' into meng-operator-assign-2
mengw15 Feb 18, 2025
c1f11d5
clean
mengw15 Feb 18, 2025
f04b951
Update input-node-address.component.ts
mengw15 Feb 18, 2025
99a14d4
code cleaning
mengw15 Feb 20, 2025
de37a08
code cleaning
mengw15 Feb 20, 2025
0fb0f3b
add comment
mengw15 Feb 20, 2025
9549a59
Update ManualLocationConfiguration.scala
mengw15 Feb 20, 2025
3f8aab0
Merge branch 'master' into meng-operator-assign-2
mengw15 Feb 20, 2025
a42ca61
Merge branch 'master' into meng-operator-assign-2
mengw15 Feb 24, 2025
f6d843b
reverse sentimentOP
mengw15 Apr 5, 2025
f3bbb49
Merge branch 'master' into meng-operator-assign-2
mengw15 Apr 5, 2025
caeabf5
add source op
mengw15 Apr 5, 2025
54b6f09
fmt fix
mengw15 Apr 5, 2025
f4ed91c
Merge branch 'master' into meng-operator-assign-2
mengw15 Apr 7, 2025
69832d6
Merge branch 'master' into meng-operator-assign-2
mengw15 Apr 11, 2025
65d1d86
Merge branch 'master' into meng-operator-assign-2
mengw15 Apr 13, 2025
adff92f
Merge branch 'master' into meng-operator-assign-2
mengw15 Apr 14, 2025
f56c352
remove comment
mengw15 Apr 14, 2025
ccf9c99
rename to DesignatedLocationConfigurable trait
mengw15 Apr 14, 2025
101ed75
update comment
mengw15 Apr 14, 2025
4dd9723
rename it to configureLocationPreference
mengw15 Apr 14, 2025
1162b62
update aggregate, cartessianProduct, PythonOperator
mengw15 Apr 14, 2025
440cf22
update dictionaryMatch, difference, distinct, filter
mengw15 Apr 14, 2025
1bdcbf1
update hashjoin
mengw15 Apr 14, 2025
263c38c
update if, intersect. intervaljoin, keywordsearch, limit, projection
mengw15 Apr 14, 2025
5affdb9
update randomKsampling, Regex, ReservoirSampling, SortPartitions
mengw15 Apr 14, 2025
b34c5ba
update some more oerators with using pipe
mengw15 Apr 14, 2025
a987554
update source operators with using pipe
mengw15 Apr 14, 2025
edbe7ca
fmt fix
mengw15 Apr 14, 2025
567b204
Merge branch 'master' into meng-operator-assign-2
mengw15 Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ object ClusterListener {
final case class GetAvailableNodeAddresses()

var numWorkerNodesInCluster = 0

var currentAddresses: Set[Address] = Set.empty
}

class ClusterListener extends Actor with AmberLogging {
Expand Down Expand Up @@ -117,9 +119,13 @@ class ClusterListener extends Actor with AmberLogging {
case other => //skip
}

ClusterListener.currentAddresses = getAllAddress.toSet
numWorkerNodesInCluster = getAllAddress.size

val addressesStr = ClusterListener.currentAddresses.map(_.toString).toSeq

SessionState.getAllSessionStates.foreach { state =>
state.send(ClusterStatusUpdateEvent(numWorkerNodesInCluster))
state.send(ClusterStatusUpdateEvent(numWorkerNodesInCluster, addressesStr))
}

logger.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package edu.uci.ics.amber.engine.architecture.common

import akka.actor.{Address, Deploy}
import akka.actor.{Address, AddressFromURIString, Deploy}
import akka.remote.RemoteScope
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PreferController, RoundRobinPreference}
import edu.uci.ics.amber.core.workflow.{
GoToSpecificNode,
PhysicalOp,
PreferController,
RoundRobinPreference
}
import edu.uci.ics.amber.engine.architecture.controller.execution.OperatorExecution
import edu.uci.ics.amber.engine.architecture.deploysemantics.AddressInfo
import edu.uci.ics.amber.engine.architecture.pythonworker.PythonWorkflowWorker
Expand Down Expand Up @@ -38,6 +43,16 @@ object ExecutorDeployment {
val preferredAddress: Address = locationPreference match {
case PreferController =>
addressInfo.controllerAddress
case node: GoToSpecificNode =>
val targetAddress = AddressFromURIString(node.nodeAddr)
addressInfo.allAddresses.find(addr => addr == targetAddress) match {
case Some(address) => address
case None =>
throw new IllegalStateException(
s"Designated node address '${node.nodeAddr}' not found among available addresses: " +
addressInfo.allAddresses.map(_.host.getOrElse("None")).mkString(", ")
)
}
case RoundRobinPreference =>
assert(
addressInfo.allAddresses.nonEmpty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import edu.uci.ics.amber.clustering.ClusterListener
import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor

import java.io.{BufferedReader, InputStreamReader}
import java.net.URL
import java.net.{InetAddress, URL}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -46,7 +46,9 @@ object AmberRuntime {
try {
val query = new URL("http://checkip.amazonaws.com")
val in = new BufferedReader(new InputStreamReader(query.openStream()))
in.readLine()
val ip = in.readLine()
val localIp = InetAddress.getLocalHost().getHostAddress()
ip
} catch {
case e: Exception => throw e
}
Expand All @@ -56,18 +58,33 @@ object AmberRuntime {
var localIpAddress = "localhost"
if (clusterMode) {
localIpAddress = getNodeIpAddress
}

val masterConfig = ConfigFactory
.parseString(s"""
val localPrivateIdAddress = InetAddress.getLocalHost().getHostAddress()

val masterConfig = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.port = 2552
akka.remote.artery.canonical.hostname = $localIpAddress
akka.remote.artery.bind.hostname = $localPrivateIdAddress
akka.remote.artery.bind.port = 2552
akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
""")
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
createAmberSystem(masterConfig)
} else {
val masterConfig = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.port = 2552
akka.remote.artery.canonical.hostname = $localIpAddress
akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
""")
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
createAmberSystem(masterConfig)
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
createAmberSystem(masterConfig)
}
}

def akkaConfig: Config =
Expand All @@ -79,18 +96,35 @@ object AmberRuntime {
val addr = mainNodeAddress.getOrElse("localhost")
var localIpAddress = "localhost"
if (mainNodeAddress.isDefined) {

localIpAddress = getNodeIpAddress
}
val workerConfig = ConfigFactory
.parseString(s"""

val localPrivateIdAddress = InetAddress.getLocalHost().getHostAddress()

val workerConfig = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.hostname = $localIpAddress
akka.remote.artery.canonical.port = 0
akka.remote.artery.bind.hostname = $localPrivateIdAddress
akka.remote.artery.bind.port = 0
akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
""")
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(addr)
createAmberSystem(workerConfig)
} else {
val workerConfig = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.hostname = $localIpAddress
akka.remote.artery.canonical.port = 0
akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
""")
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(addr)
createAmberSystem(workerConfig)
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(addr)
createAmberSystem(workerConfig)
}
}

private def createAmberSystem(actorSystemConf: Config): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package edu.uci.ics.texera.web.model.websocket.response

import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent

case class ClusterStatusUpdateEvent(numWorkers: Int) extends TexeraWebSocketEvent
case class ClusterStatusUpdateEvent(numWorkers: Int, addresses: Seq[String] = Seq.empty)
extends TexeraWebSocketEvent
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ class WorkflowWebsocketResource extends LazyLogging {
val workflowState =
WorkflowService.getOrCreate(WorkflowIdentity(wid))
sessionState.subscribe(workflowState)
sessionState.send(ClusterStatusUpdateEvent(ClusterListener.numWorkerNodesInCluster))
val addressesStr = ClusterListener.currentAddresses.map(_.toString).toSeq
sessionState.send(
ClusterStatusUpdateEvent(ClusterListener.numWorkerNodesInCluster, addressesStr)
)
logger.info("connection open")
}

Expand Down
2 changes: 2 additions & 0 deletions core/gui/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import { AdminGuardService } from "./dashboard/service/admin/guard/admin-guard.s
import { ContextMenuComponent } from "./workspace/component/workflow-editor/context-menu/context-menu/context-menu.component";
import { CoeditorUserIconComponent } from "./workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component";
import { InputAutoCompleteComponent } from "./workspace/component/input-autocomplete/input-autocomplete.component";
import { InputNodeAddressComponent } from "./workspace/component/input-node-address/input-node-address.component";
import { CollabWrapperComponent } from "./common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component";
import { NzSwitchModule } from "ng-zorro-antd/switch";
import { AboutComponent } from "./hub/component/about/about.component";
Expand Down Expand Up @@ -214,6 +215,7 @@ registerLocaleData(en);
ContextMenuComponent,
CoeditorUserIconComponent,
InputAutoCompleteComponent,
InputNodeAddressComponent,
FileSelectionComponent,
CollabWrapperComponent,
AboutComponent,
Expand Down
2 changes: 2 additions & 0 deletions core/gui/src/app/common/formly/formly-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { CodeareaCustomTemplateComponent } from "../../workspace/component/codea
import { PresetWrapperComponent } from "./preset-wrapper/preset-wrapper.component";
import { InputAutoCompleteComponent } from "../../workspace/component/input-autocomplete/input-autocomplete.component";
import { CollabWrapperComponent } from "./collab-wrapper/collab-wrapper/collab-wrapper.component";
import { InputNodeAddressComponent } from "../../workspace/component/input-node-address/input-node-address.component";

/**
* Configuration for using Json Schema with Formly.
Expand Down Expand Up @@ -57,6 +58,7 @@ export const TEXERA_FORMLY_CONFIG = {
{ name: "multischema", component: MultiSchemaTypeComponent },
{ name: "codearea", component: CodeareaCustomTemplateComponent },
{ name: "inputautocomplete", component: InputAutoCompleteComponent, wrappers: ["form-field"] },
{ name: "inputnodeaddress", component: InputNodeAddressComponent, wrappers: ["form-field"] },
],
wrappers: [
{ name: "preset-wrapper", component: PresetWrapperComponent },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<nz-select
[formControl]="formControl"
[nzPlaceHolder]="'Select Node Address'"
nzAllowClear>
<nz-option
*ngFor="let addr of nodeAddresses"
[nzValue]="addr"
[nzLabel]="addr">
</nz-option>
</nz-select>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
:host {
display: block;
}

nz-select {
width: 100%;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Component, OnInit, OnDestroy } from "@angular/core";
import { FieldType, FieldTypeConfig } from "@ngx-formly/core";
import { Subject, Subscription } from "rxjs";
import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service";
import { takeUntil } from "rxjs/operators";

@Component({
selector: "texera-input-node-address",
templateUrl: "./input-node-address.component.html",
styleUrls: ["./input-node-address.component.scss"],
})
export class InputNodeAddressComponent extends FieldType<FieldTypeConfig> implements OnInit, OnDestroy {
public nodeAddresses: string[] = [];

private wsSubscription!: Subscription;

private componentDestroy = new Subject<void>();

constructor(private workflowWebsocketService: WorkflowWebsocketService) {
super();
}

ngOnInit(): void {
this.nodeAddresses = this.workflowWebsocketService.workerAddresses;

this.wsSubscription = this.workflowWebsocketService
.websocketEvent()
.pipe(takeUntil(this.componentDestroy))
.subscribe(event => {
if (event.type === "ClusterStatusUpdateEvent") {
this.nodeAddresses = event.addresses;
}
});
}

ngOnDestroy(): void {
this.componentDestroy.next();
this.componentDestroy.complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,53 @@
(focusout)="disconnectQuillFromText()"
(keyup.enter)="disconnectQuillFromText()"></div>

<form
nz-form
[nzLayout]="'vertical'"
*ngIf="formlyFields && formlyFormGroup"
[formGroup]="formlyFormGroup"
class="property-editor-form">
<formly-form
(modelChange)="onFormChanges($event)"
[fields]="formlyFields"
[form]="formlyFormGroup"
[model]="formData"
[options]="formlyOptions">
</formly-form>
<texera-type-casting-display
*ngIf="isTypeCasting"
currentOperatorId="{{this.currentOperatorId}}"></texera-type-casting-display>
</form>
<nz-tabset>
<nz-tab
nzTitle="Basic Settings"
*ngIf="basicFields.length > 0">
<form
nz-form
nzLayout="vertical"
*ngIf="formlyFields && formlyFormGroup"
[formGroup]="formlyFormGroup"
class="property-editor-form">
<formly-form
(modelChange)="onFormChanges($event)"
[fields]="basicFields"
[form]="formlyFormGroup"
[model]="formData"
[options]="formlyOptions">
</formly-form>

<texera-type-casting-display
*ngIf="isTypeCasting"
currentOperatorId="{{this.currentOperatorId}}"></texera-type-casting-display>
</form>
</nz-tab>

<nz-tab
nzTitle="Advanced Settings"
*ngIf="advancedFields.length > 0">
<form
nz-form
nzLayout="vertical"
*ngIf="formlyFields && formlyFormGroup"
[formGroup]="formlyFormGroup"
class="property-editor-form">
<formly-form
(modelChange)="onFormChanges($event)"
[fields]="advancedFields"
[form]="formlyFormGroup"
[model]="formData"
[options]="formlyOptions">
</formly-form>

<texera-type-casting-display
*ngIf="isTypeCasting"
currentOperatorId="{{this.currentOperatorId}}"></texera-type-casting-display>
</form>
</nz-tab>
</nz-tabset>

<button
(click)="allowModifyOperatorLogic()"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On
formlyFields: FormlyFieldConfig[] | undefined;
formTitle: string | undefined;

public basicFields: FormlyFieldConfig[] = [];
public advancedFields: FormlyFieldConfig[] = [];

// The field name and its css style to be overridden, e.g., for showing the diff between two workflows.
// example: new Map([
// ["attribute", "outline: 3px solid green; transition: 0.3s ease-in-out outline;"],
Expand Down Expand Up @@ -379,6 +382,20 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On
},
};

if (mappedField.key == "nodeAddr") {
mappedField.props = {
...mappedField.props,
tab: "AdvancedSettings",
};
}

if (mappedField.key == "autoSelectNodeAddress") {
mappedField.props = {
...mappedField.props,
tab: "AdvancedSettings",
};
}

// Disable dummy operator for user
if (mappedField.key === "dummyOperator") {
mappedField.expressions = {
Expand Down Expand Up @@ -432,6 +449,11 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On
mappedField.type = "inputautocomplete";
}

// if the title is nodeAddr, then change it to custom inputnodeaddress input template
if (mappedField.key == "nodeAddr") {
mappedField.type = "inputnodeaddress";
}

// if the title is python script (for Python UDF), then make this field a custom template 'codearea'
if (mapSource?.description?.toLowerCase() === "input your code here") {
if (mappedField.type) {
Expand Down Expand Up @@ -669,6 +691,10 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On
// not return field.fieldGroup directly because
// doing so the validator in the field will not be triggered
this.formlyFields = [field];

const fieldGroup = this.formlyFields?.[0]?.fieldGroup ?? [];
this.basicFields = fieldGroup.filter(f => f.props?.tab !== "AdvancedSettings");
this.advancedFields = fieldGroup.filter(f => f.props?.tab === "AdvancedSettings");
}

allowModifyOperatorLogic(): void {
Expand Down
Loading
Loading