diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index 0b68aad221e..e1011d1b31d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -304,10 +304,15 @@ class CostBasedScheduleGenerator( */ private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = { val searchResultFuture: Future[SearchResult] = Future { - if (ApplicationConfig.useTopDownSearch) - topDownSearch(globalSearch = ApplicationConfig.useGlobalSearch) - else - bottomUpSearch(globalSearch = ApplicationConfig.useGlobalSearch) + workflowContext.workflowSettings.executionMode match { + case ExecutionMode.MATERIALIZED => + materializedSearch() + case ExecutionMode.PIPELINED => + if (ApplicationConfig.useTopDownSearch) + topDownSearch(globalSearch = ApplicationConfig.useGlobalSearch) + else + bottomUpSearch(globalSearch = ApplicationConfig.useGlobalSearch) + } } val searchResult = Try( Await.result(searchResultFuture, ApplicationConfig.searchTimeoutMilliseconds.milliseconds) @@ -477,6 +482,29 @@ class CostBasedScheduleGenerator( ) } + /** Constructs a baseline fully materialized region plan (one operator per region) and evaluates its cost. */ + def materializedSearch(): SearchResult = { + val startTime = System.nanoTime() + + val (regionDAG, cost) = + tryConnectRegionDAG(physicalPlan.links) match { + case Left(dag) => (dag, allocateResourcesAndEvaluateCost(dag)) + case Right(_) => + ( + new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink]), + Double.PositiveInfinity + ) + } + + SearchResult( + state = Set.empty, + regionDAG = regionDAG, + cost = cost, + searchTimeNanoSeconds = System.nanoTime() - startTime, + numStatesExplored = 1 + ) + } + /** * Another direction to perform the search. Depending on the configuration, either a global search or a greedy search * will be performed to find an optimal plan. The search starts from a plan where all edges are materialized, and diff --git a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala index 50d07a98194..7f17e1aebd7 100644 --- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala +++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala @@ -151,8 +151,11 @@ class WorkflowCompiler( val (physicalPlan, outputPortsNeedingStorage) = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) - context.workflowSettings = - WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage) + context.workflowSettings = WorkflowSettings( + context.workflowSettings.dataTransferBatchSize, + outputPortsNeedingStorage, + context.workflowSettings.executionMode + ) Workflow(context, logicalPlan, physicalPlan) } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala index 07c1758d9e8..1ac6b7b599c 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala @@ -19,7 +19,12 @@ package org.apache.texera.amber.engine.architecture.scheduling -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.core.workflow.{ + ExecutionMode, + PortIdentity, + WorkflowContext, + WorkflowSettings +} import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow import org.apache.texera.amber.operator.TestOperators @@ -27,6 +32,8 @@ import org.apache.texera.workflow.LogicalLink import org.scalamock.scalatest.MockFactory import org.scalatest.flatspec.AnyFlatSpec +import scala.jdk.CollectionConverters._ + class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { "CostBasedRegionPlanGenerator" should "finish bottom-up search using different pruning techniques with correct number of states explored in csv->->filter->join->filter2 workflow" in { @@ -206,4 +213,306 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { } + // MATERIALIZED ExecutionMode tests - each operator should be a separate region + "CostBasedRegionPlanGenerator" should "create separate region for each operator in MATERIALIZED mode for simple csv workflow" in { + val csvOpDesc = TestOperators.smallCsvScanOpDesc() + val materializedContext = new WorkflowContext( + workflowSettings = WorkflowSettings( + dataTransferBatchSize = 400, + executionMode = ExecutionMode.MATERIALIZED + ) + ) + val workflow = buildWorkflow( + List(csvOpDesc), + List(), + materializedContext + ) + + val scheduleGenerator = new CostBasedScheduleGenerator( + workflow.context, + workflow.physicalPlan, + CONTROLLER + ) + val result = scheduleGenerator.materializedSearch() + + // Should only explore 1 state (fully materialized) + assert(result.numStatesExplored == 1) + + // Each physical operator should be in its own region + val regions = result.regionDAG.vertexSet().asScala + val numPhysicalOps = workflow.physicalPlan.operators.size + assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, got ${regions.size}") + + // Each region should contain exactly 1 operator + regions.foreach { region => + assert( + region.getOperators.size == 1, + s"Expected region to have 1 operator, got ${region.getOperators.size}" + ) + } + } + + "CostBasedRegionPlanGenerator" should "create separate region for each operator in MATERIALIZED mode for csv->keyword workflow" in { + val csvOpDesc = TestOperators.smallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") + val materializedContext = new WorkflowContext( + workflowSettings = WorkflowSettings( + dataTransferBatchSize = 400, + executionMode = ExecutionMode.MATERIALIZED + ) + ) + val workflow = buildWorkflow( + List(csvOpDesc, keywordOpDesc), + List( + LogicalLink( + csvOpDesc.operatorIdentifier, + PortIdentity(), + keywordOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + materializedContext + ) + + val scheduleGenerator = new CostBasedScheduleGenerator( + workflow.context, + workflow.physicalPlan, + CONTROLLER + ) + val result = scheduleGenerator.materializedSearch() + + // Should only explore 1 state (fully materialized) + assert(result.numStatesExplored == 1) + + // Each physical operator should be in its own region + val regions = result.regionDAG.vertexSet().asScala + val numPhysicalOps = workflow.physicalPlan.operators.size + assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, got ${regions.size}") + + // Each region should contain exactly 1 operator + regions.foreach { region => + assert( + region.getOperators.size == 1, + s"Expected region to have 1 operator, got ${region.getOperators.size}" + ) + } + + // All links should be materialized (represented as region links) + val numRegionLinks = result.regionDAG.edgeSet().asScala.size + val numPhysicalLinks = workflow.physicalPlan.links.size + assert( + numRegionLinks == numPhysicalLinks, + s"Expected $numPhysicalLinks region links, got $numRegionLinks" + ) + } + + "CostBasedRegionPlanGenerator" should "create separate region for each operator in MATERIALIZED mode for csv->keyword->count workflow" in { + val csvOpDesc = TestOperators.smallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") + val countOpDesc = TestOperators.aggregateAndGroupByDesc( + "Region", + org.apache.texera.amber.operator.aggregate.AggregationFunction.COUNT, + List[String]() + ) + val materializedContext = new WorkflowContext( + workflowSettings = WorkflowSettings( + dataTransferBatchSize = 400, + executionMode = ExecutionMode.MATERIALIZED + ) + ) + val workflow = buildWorkflow( + List(csvOpDesc, keywordOpDesc, countOpDesc), + List( + LogicalLink( + csvOpDesc.operatorIdentifier, + PortIdentity(), + keywordOpDesc.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + keywordOpDesc.operatorIdentifier, + PortIdentity(), + countOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + materializedContext + ) + + val scheduleGenerator = new CostBasedScheduleGenerator( + workflow.context, + workflow.physicalPlan, + CONTROLLER + ) + val result = scheduleGenerator.materializedSearch() + + // Should only explore 1 state (fully materialized) + assert(result.numStatesExplored == 1) + + // Each physical operator should be in its own region + val regions = result.regionDAG.vertexSet().asScala + val numPhysicalOps = workflow.physicalPlan.operators.size + assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, got ${regions.size}") + + // Each region should contain exactly 1 operator + regions.foreach { region => + assert( + region.getOperators.size == 1, + s"Expected region to have 1 operator, got ${region.getOperators.size}" + ) + } + + // All links should be materialized (represented as region links) + val numRegionLinks = result.regionDAG.edgeSet().asScala.size + val numPhysicalLinks = workflow.physicalPlan.links.size + assert( + numRegionLinks == numPhysicalLinks, + s"Expected $numPhysicalLinks region links, got $numRegionLinks" + ) + } + + "CostBasedRegionPlanGenerator" should "create separate region for each operator in MATERIALIZED mode for join workflow" in { + val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() + val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() + val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") + val materializedContext = new WorkflowContext( + workflowSettings = WorkflowSettings( + dataTransferBatchSize = 400, + executionMode = ExecutionMode.MATERIALIZED + ) + ) + val workflow = buildWorkflow( + List( + headerlessCsvOpDesc1, + headerlessCsvOpDesc2, + joinOpDesc + ), + List( + LogicalLink( + headerlessCsvOpDesc1.operatorIdentifier, + PortIdentity(), + joinOpDesc.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + headerlessCsvOpDesc2.operatorIdentifier, + PortIdentity(), + joinOpDesc.operatorIdentifier, + PortIdentity(1) + ) + ), + materializedContext + ) + + val scheduleGenerator = new CostBasedScheduleGenerator( + workflow.context, + workflow.physicalPlan, + CONTROLLER + ) + val result = scheduleGenerator.materializedSearch() + + // Should only explore 1 state (fully materialized) + assert(result.numStatesExplored == 1) + + // Each physical operator should be in its own region + val regions = result.regionDAG.vertexSet().asScala + val numPhysicalOps = workflow.physicalPlan.operators.size + assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, got ${regions.size}") + + // Each region should contain exactly 1 operator + regions.foreach { region => + assert( + region.getOperators.size == 1, + s"Expected region to have 1 operator, got ${region.getOperators.size}" + ) + } + + // All links should be materialized (represented as region links) + val numRegionLinks = result.regionDAG.edgeSet().asScala.size + val numPhysicalLinks = workflow.physicalPlan.links.size + assert( + numRegionLinks == numPhysicalLinks, + s"Expected $numPhysicalLinks region links, got $numRegionLinks" + ) + } + + "CostBasedRegionPlanGenerator" should "create separate region for each operator in MATERIALIZED mode for complex csv->->filter->join->filter2 workflow" in { + val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") + val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") + val keywordOpDesc2 = TestOperators.keywordSearchOpDesc("column-1", "Asia") + val materializedContext = new WorkflowContext( + workflowSettings = WorkflowSettings( + dataTransferBatchSize = 400, + executionMode = ExecutionMode.MATERIALIZED + ) + ) + val workflow = buildWorkflow( + List( + headerlessCsvOpDesc1, + keywordOpDesc, + joinOpDesc, + keywordOpDesc2 + ), + List( + LogicalLink( + headerlessCsvOpDesc1.operatorIdentifier, + PortIdentity(), + joinOpDesc.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + headerlessCsvOpDesc1.operatorIdentifier, + PortIdentity(), + keywordOpDesc.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + keywordOpDesc.operatorIdentifier, + PortIdentity(), + joinOpDesc.operatorIdentifier, + PortIdentity(1) + ), + LogicalLink( + joinOpDesc.operatorIdentifier, + PortIdentity(), + keywordOpDesc2.operatorIdentifier, + PortIdentity() + ) + ), + materializedContext + ) + + val scheduleGenerator = new CostBasedScheduleGenerator( + workflow.context, + workflow.physicalPlan, + CONTROLLER + ) + val result = scheduleGenerator.materializedSearch() + + // Should only explore 1 state (fully materialized) + assert(result.numStatesExplored == 1) + + // Each physical operator should be in its own region + val regions = result.regionDAG.vertexSet().asScala + val numPhysicalOps = workflow.physicalPlan.operators.size + assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, got ${regions.size}") + + // Each region should contain exactly 1 operator + regions.foreach { region => + assert( + region.getOperators.size == 1, + s"Expected region to have 1 operator, got ${region.getOperators.size}" + ) + } + + // All links should be materialized (represented as region links) + val numRegionLinks = result.regionDAG.edgeSet().asScala.size + val numPhysicalLinks = workflow.physicalPlan.links.size + assert( + numRegionLinks == numPhysicalLinks, + s"Expected $numPhysicalLinks region links, got $numRegionLinks" + ) + } + } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala index e9b830bdfdc..93e94b4fa91 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -23,7 +23,12 @@ import org.apache.pekko.actor.{ActorSystem, Props} import org.apache.pekko.testkit.{ImplicitSender, TestKit} import org.apache.pekko.util.Timeout import org.apache.texera.amber.clustering.SingleNodeListener -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext, WorkflowSettings} +import org.apache.texera.amber.core.workflow.{ + ExecutionMode, + PortIdentity, + WorkflowContext, + WorkflowSettings +} import org.apache.texera.amber.engine.architecture.controller._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER @@ -119,7 +124,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.PIPELINED + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -141,7 +149,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in { val expectedBatchSize = 500 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.PIPELINED + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -171,7 +182,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->count workflow" in { val expectedBatchSize = 100 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.PIPELINED + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -209,7 +223,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in { val expectedBatchSize = 300 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.PIPELINED + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) @@ -250,7 +267,10 @@ class BatchSizePropagationSpec "Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in { val expectedBatchSize = 1 - val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) + val customWorkflowSettings = WorkflowSettings( + dataTransferBatchSize = expectedBatchSize, + executionMode = ExecutionMode.PIPELINED + ) val context = new WorkflowContext(workflowSettings = customWorkflowSettings) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala index b341c1a8f67..d8f34583b1a 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala @@ -28,7 +28,12 @@ import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.{AttributeType, Tuple} import org.apache.texera.amber.core.virtualidentity.OperatorIdentity -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.core.workflow.{ + ExecutionMode, + PortIdentity, + WorkflowContext, + WorkflowSettings +} import org.apache.texera.amber.engine.architecture.controller._ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED @@ -69,6 +74,13 @@ class DataProcessingSpec val workflowContext: WorkflowContext = new WorkflowContext() + val materializedWorkflowContext: WorkflowContext = new WorkflowContext( + workflowSettings = WorkflowSettings( + dataTransferBatchSize = 400, + executionMode = ExecutionMode.MATERIALIZED + ) + ) + override protected def beforeEach(): Unit = { setUpWorkflowExecutionData() } @@ -340,4 +352,205 @@ class DataProcessingSpec ) executeWorkflow(workflow) } + + // MATERIALIZED ExecutionMode tests - each operator should be a region + "Engine" should "execute headerlessCsv workflow with MATERIALIZED mode" in { + val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() + val workflow = buildWorkflow( + List(headerlessCsvOpDesc), + List(), + materializedWorkflowContext + ) + val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) + + assert(results.size == 100) + } + + "Engine" should "execute headerlessMultiLineDataCsv workflow with MATERIALIZED mode" in { + val headerlessCsvOpDesc = TestOperators.headerlessSmallMultiLineDataCsvScanOpDesc() + val workflow = buildWorkflow( + List(headerlessCsvOpDesc), + List(), + materializedWorkflowContext + ) + val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) + + assert(results.size == 100) + } + + "Engine" should "execute jsonl workflow with MATERIALIZED mode" in { + val jsonlOp = TestOperators.smallJSONLScanOpDesc() + val workflow = buildWorkflow( + List(jsonlOp), + List(), + materializedWorkflowContext + ) + val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) + + assert(results.size == 100) + + for (result <- results) { + val schema = result.getSchema + assert(schema.getAttribute("id").getType == AttributeType.LONG) + assert(schema.getAttribute("first_name").getType == AttributeType.STRING) + assert(schema.getAttribute("flagged").getType == AttributeType.BOOLEAN) + assert(schema.getAttribute("year").getType == AttributeType.INTEGER) + assert(schema.getAttribute("created_at").getType == AttributeType.TIMESTAMP) + assert(schema.getAttributes.length == 9) + } + } + + "Engine" should "execute mediumFlattenJsonl workflow with MATERIALIZED mode" in { + val jsonlOp = TestOperators.mediumFlattenJSONLScanOpDesc() + val workflow = buildWorkflow( + List(jsonlOp), + List(), + materializedWorkflowContext + ) + val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) + + assert(results.size == 1000) + + for (result <- results) { + val schema = result.getSchema + assert(schema.getAttribute("id").getType == AttributeType.LONG) + assert(schema.getAttribute("first_name").getType == AttributeType.STRING) + assert(schema.getAttribute("flagged").getType == AttributeType.BOOLEAN) + assert(schema.getAttribute("year").getType == AttributeType.INTEGER) + assert(schema.getAttribute("created_at").getType == AttributeType.TIMESTAMP) + assert(schema.getAttribute("test_object.array2.another").getType == AttributeType.INTEGER) + assert(schema.getAttributes.length == 13) + } + } + + "Engine" should "execute headerlessCsv->keyword workflow with MATERIALIZED mode" in { + val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") + val workflow = buildWorkflow( + List(headerlessCsvOpDesc, keywordOpDesc), + List( + LogicalLink( + headerlessCsvOpDesc.operatorIdentifier, + PortIdentity(), + keywordOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + materializedWorkflowContext + ) + executeWorkflow(workflow) + } + + "Engine" should "execute csv workflow with MATERIALIZED mode" in { + val csvOpDesc = TestOperators.smallCsvScanOpDesc() + val workflow = buildWorkflow( + List(csvOpDesc), + List(), + materializedWorkflowContext + ) + executeWorkflow(workflow) + } + + "Engine" should "execute csv->keyword workflow with MATERIALIZED mode" in { + val csvOpDesc = TestOperators.smallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") + val workflow = buildWorkflow( + List(csvOpDesc, keywordOpDesc), + List( + LogicalLink( + csvOpDesc.operatorIdentifier, + PortIdentity(), + keywordOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + materializedWorkflowContext + ) + executeWorkflow(workflow) + } + + "Engine" should "execute csv->keyword->count workflow with MATERIALIZED mode" in { + val csvOpDesc = TestOperators.smallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") + val countOpDesc = + TestOperators.aggregateAndGroupByDesc("Region", AggregationFunction.COUNT, List[String]()) + val workflow = buildWorkflow( + List(csvOpDesc, keywordOpDesc, countOpDesc), + List( + LogicalLink( + csvOpDesc.operatorIdentifier, + PortIdentity(), + keywordOpDesc.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + keywordOpDesc.operatorIdentifier, + PortIdentity(), + countOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + materializedWorkflowContext + ) + executeWorkflow(workflow) + } + + "Engine" should "execute csv->keyword->averageAndGroupBy workflow with MATERIALIZED mode" in { + val csvOpDesc = TestOperators.smallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") + val averageAndGroupByOpDesc = + TestOperators.aggregateAndGroupByDesc( + "Units Sold", + AggregationFunction.AVERAGE, + List[String]("Country") + ) + val workflow = buildWorkflow( + List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc), + List( + LogicalLink( + csvOpDesc.operatorIdentifier, + PortIdentity(), + keywordOpDesc.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + keywordOpDesc.operatorIdentifier, + PortIdentity(), + averageAndGroupByOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + materializedWorkflowContext + ) + executeWorkflow(workflow) + } + + "Engine" should "execute csv->(csv->)->join workflow with MATERIALIZED mode" in { + val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() + val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() + val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") + val workflow = buildWorkflow( + List( + headerlessCsvOpDesc1, + headerlessCsvOpDesc2, + joinOpDesc + ), + List( + LogicalLink( + headerlessCsvOpDesc1.operatorIdentifier, + PortIdentity(), + joinOpDesc.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + headerlessCsvOpDesc2.operatorIdentifier, + PortIdentity(), + joinOpDesc.operatorIdentifier, + PortIdentity(1) + ) + ), + materializedWorkflowContext + ) + executeWorkflow(workflow) + } } diff --git a/common/config/src/main/resources/gui.conf b/common/config/src/main/resources/gui.conf index 8039441b130..d58d94ac7b9 100644 --- a/common/config/src/main/resources/gui.conf +++ b/common/config/src/main/resources/gui.conf @@ -63,8 +63,11 @@ gui { default-data-transfer-batch-size = 400 default-data-transfer-batch-size = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_DATA_TRANSFER_BATCH_SIZE} + # default execution mode for workflows, can be either MATERIALIZED or PIPELINED + default-execution-mode = PIPELINED + default-execution-mode = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_EXECUTION_MODE} + # whether selecting files from datasets instead of the local file system. - # The user system must be enabled to make this flag work! selecting-files-from-datasets-enabled = true selecting-files-from-datasets-enabled = ${?GUI_WORKFLOW_WORKSPACE_SELECTING_FILES_FROM_DATASETS_ENABLED} diff --git a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala index 14016f43743..adc789c9843 100644 --- a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala @@ -47,6 +47,8 @@ object GuiConfig { conf.getBoolean("gui.workflow-workspace.auto-attribute-correction-enabled") val guiWorkflowWorkspaceDefaultDataTransferBatchSize: Int = conf.getInt("gui.workflow-workspace.default-data-transfer-batch-size") + val guiWorkflowWorkspaceDefaultExecutionMode: String = + conf.getString("gui.workflow-workspace.default-execution-mode") val guiWorkflowWorkspaceSelectingFilesFromDatasetsEnabled: Boolean = conf.getBoolean("gui.workflow-workspace.selecting-files-from-datasets-enabled") val guiWorkflowWorkspaceWorkflowExecutionsTrackingEnabled: Boolean = diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java new file mode 100644 index 00000000000..c02690e4e56 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.workflow; + +public enum ExecutionMode { + PIPELINED, + MATERIALIZED; + + public static ExecutionMode fromString(String value) { return valueOf(value); } +} \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala index ee7659d0ca9..dc4edf27824 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala @@ -30,7 +30,8 @@ object WorkflowContext { val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L) val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L) val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings( - dataTransferBatchSize = 400 // TODO: make this configurable + dataTransferBatchSize = 400, + executionMode = ExecutionMode.PIPELINED ) } class WorkflowContext( diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala index 88ebcb068f4..f5b4a610fd3 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala @@ -21,5 +21,6 @@ package org.apache.texera.amber.core.workflow case class WorkflowSettings( dataTransferBatchSize: Int, - outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty + outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, + executionMode: ExecutionMode ) diff --git a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala index 30c657746fb..b7517d81eb7 100644 --- a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala +++ b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala @@ -46,6 +46,7 @@ class ConfigResource { "timetravelEnabled" -> GuiConfig.guiWorkflowWorkspaceTimetravelEnabled, "productionSharedEditingServer" -> GuiConfig.guiWorkflowWorkspaceProductionSharedEditingServer, "defaultDataTransferBatchSize" -> GuiConfig.guiWorkflowWorkspaceDefaultDataTransferBatchSize, + "defaultExecutionMode" -> GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode, "workflowEmailNotificationEnabled" -> GuiConfig.guiWorkflowWorkspaceWorkflowEmailNotificationEnabled, "sharingComputingUnitEnabled" -> ComputingUnitConfig.sharingComputingUnitEnabled, "operatorConsoleMessageBufferSize" -> GuiConfig.guiWorkflowWorkspaceOperatorConsoleMessageBufferSize, diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts index 73feecfdba3..9ddf4bbcc2e 100644 --- a/frontend/src/app/app.module.ts +++ b/frontend/src/app/app.module.ts @@ -181,6 +181,7 @@ import { AdminSettingsComponent } from "./dashboard/component/admin/settings/adm import { FormlyRepeatDndComponent } from "./common/formly/repeat-dnd/repeat-dnd.component"; import { NzInputNumberModule } from "ng-zorro-antd/input-number"; import { NzCheckboxModule } from "ng-zorro-antd/checkbox"; +import { NzRadioModule } from "ng-zorro-antd/radio"; registerLocaleData(en); @@ -344,6 +345,7 @@ registerLocaleData(en); NzProgressModule, NzInputNumberModule, NzCheckboxModule, + NzRadioModule, ], providers: [ provideNzI18n(en_US), diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts b/frontend/src/app/common/service/gui-config.service.mock.ts index daa8adfd224..179259c5a97 100644 --- a/frontend/src/app/common/service/gui-config.service.mock.ts +++ b/frontend/src/app/common/service/gui-config.service.mock.ts @@ -20,6 +20,7 @@ import { Injectable } from "@angular/core"; import { Observable, of } from "rxjs"; import { GuiConfig } from "../type/gui-config"; +import { ExecutionMode } from "../type/workflow"; /** * Mock GuiConfigService for testing purposes. @@ -42,6 +43,7 @@ export class MockGuiConfigService { productionSharedEditingServer: false, pythonLanguageServerPort: "3000", defaultDataTransferBatchSize: 100, + defaultExecutionMode: ExecutionMode.PIPELINED, workflowEmailNotificationEnabled: false, sharingComputingUnitEnabled: false, operatorConsoleMessageBufferSize: 1000, diff --git a/frontend/src/app/common/type/gui-config.ts b/frontend/src/app/common/type/gui-config.ts index b47dfa0ab1b..d8786c1dc08 100644 --- a/frontend/src/app/common/type/gui-config.ts +++ b/frontend/src/app/common/type/gui-config.ts @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +import { ExecutionMode } from "./workflow"; // Please refer to core/config/src/main/resources/gui.conf for the definition of each config item export interface GuiConfig { @@ -33,6 +34,7 @@ export interface GuiConfig { productionSharedEditingServer: boolean; pythonLanguageServerPort: string; defaultDataTransferBatchSize: number; + defaultExecutionMode: ExecutionMode; workflowEmailNotificationEnabled: boolean; sharingComputingUnitEnabled: boolean; operatorConsoleMessageBufferSize: number; diff --git a/frontend/src/app/common/type/workflow.ts b/frontend/src/app/common/type/workflow.ts index 6792df9d7d0..8e1c1c7e85b 100644 --- a/frontend/src/app/common/type/workflow.ts +++ b/frontend/src/app/common/type/workflow.ts @@ -20,8 +20,14 @@ import { WorkflowMetadata } from "../../dashboard/type/workflow-metadata.interface"; import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../workspace/types/workflow-common.interface"; +export enum ExecutionMode { + PIPELINED = "PIPELINED", + MATERIALIZED = "MATERIALIZED", +} + export interface WorkflowSettings { dataTransferBatchSize: number; + executionMode: ExecutionMode; } /** diff --git a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts index e87b5acd22a..09a158e336f 100644 --- a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts +++ b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts @@ -19,7 +19,7 @@ //All times in test Workflows are in PST because our local machine's timezone is PST -import { Workflow, WorkflowContent } from "../../common/type/workflow"; +import { ExecutionMode, Workflow, WorkflowContent } from "../../common/type/workflow"; import { DashboardEntry } from "../type/dashboard-entry"; import { DashboardProject } from "../type/dashboard-project.interface"; @@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): WorkflowContent => commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: 400 }, + settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.PIPELINED }, }); export const testWorkflow1: Workflow = { diff --git a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts index 4903e70bcb4..75f61a223ce 100644 --- a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts +++ b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts @@ -31,7 +31,7 @@ import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry"; import { UserService } from "../../../../common/service/user/user.service"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { NotificationService } from "../../../../common/service/notification/notification.service"; -import { WorkflowContent } from "../../../../common/type/workflow"; +import { ExecutionMode, WorkflowContent } from "../../../../common/type/workflow"; import { NzUploadFile } from "ng-zorro-antd/upload"; import * as JSZip from "jszip"; import { FiltersComponent } from "../filters/filters.component"; @@ -230,7 +230,10 @@ export class UserWorkflowComponent implements AfterViewInit { commentBoxes: [], links: [], operatorPositions: {}, - settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize }, + settings: { + dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize, + executionMode: this.config.env.defaultExecutionMode, + }, }; let localPid = this.pid; this.workflowPersistService diff --git a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html index af16bb0dee1..ceeea480af7 100644 --- a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html +++ b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html @@ -16,12 +16,25 @@ specific language governing permissions and limitations under the License. --> -