Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import static io.temporal.samples.batch.iterator.IteratorBatchWorker.TASK_QUEUE;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.List;

/** Starts a single execution of IteratorBatchWorkflow. */
public class IteratorBatchStarter {
Expand All @@ -16,12 +16,8 @@ public static void main(String[] args) {
WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build();
IteratorBatchWorkflow batchWorkflow =
workflowClient.newWorkflowStub(IteratorBatchWorkflow.class, options);
WorkflowExecution execution = WorkflowClient.start(batchWorkflow::processBatch, 5, 0);
System.out.println(
"Started batch workflow. WorkflowId="
+ execution.getWorkflowId()
+ ", RunId="
+ execution.getRunId());
List<SingleResponse> responses = batchWorkflow.processBatch(100, 0);
System.out.println("Responses=" + responses.toString());
System.exit(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public static void main(String[] args) {
Worker worker = factory.newWorker(TASK_QUEUE);

worker.registerWorkflowImplementationTypes(
IteratorBatchWorkflowImpl.class, RecordProcessorWorkflowImpl.class);
IteratorBatchWorkflowImpl.class,
SingleListingMigrationWorkflowImpl.class,
ListingMigrationWorkflowImpl.class);

worker.registerActivitiesImplementations(new RecordLoaderImpl());
factory.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.List;

@WorkflowInterface
public interface IteratorBatchWorkflow {
Expand All @@ -14,5 +15,5 @@ public interface IteratorBatchWorkflow {
* @return total number of processed records.
*/
@WorkflowMethod
int processBatch(int pageSize, int offset);
List<SingleResponse> processBatch(int pageSize, int offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ public final class IteratorBatchWorkflowImpl implements IteratorBatchWorkflow {
Workflow.newContinueAsNewStub(IteratorBatchWorkflow.class);

@Override
public int processBatch(int pageSize, int offset) {
public List<SingleResponse> processBatch(int pageSize, int offset) {
// Loads a page of records
List<SingleRecord> records = recordLoader.getRecords(pageSize, offset);
// Starts a child per record asynchrnously.
List<Promise<Void>> results = new ArrayList<>(records.size());
List<Promise<SingleResponse>> results = new ArrayList<>(records.size());
for (SingleRecord record : records) {
// Uses human friendly child id.
String childId = Workflow.getInfo().getWorkflowId() + "/" + record.getId();
RecordProcessorWorkflow processor =
SingleListingMigrationWorkflow processor =
Workflow.newChildWorkflowStub(
RecordProcessorWorkflow.class,
SingleListingMigrationWorkflow.class,
ChildWorkflowOptions.newBuilder().setWorkflowId(childId).build());
Promise<Void> result = Async.procedure(processor::processRecord, record);
Promise<SingleResponse> result = Async.function(processor::processRecord, record);
results.add(result);
}
// Waits for all children to complete.
Expand All @@ -54,7 +54,7 @@ public int processBatch(int pageSize, int offset) {

// No more records in the dataset. Completes the workflow.
if (records.isEmpty()) {
return offset;
return List.of();
}

// Continues as new with the increased offset.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.temporal.samples.batch.iterator;

import static io.temporal.samples.batch.iterator.IteratorBatchWorker.TASK_QUEUE;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;

public class ListingMigrationStarter {

public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient workflowClient = WorkflowClient.newInstance(service);
WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build();
ListingMigrationWorkflow migrationWorkflow =
workflowClient.newWorkflowStub(ListingMigrationWorkflow.class, options);
migrationWorkflow.execute();
System.exit(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.temporal.samples.batch.iterator;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

@WorkflowInterface
public interface ListingMigrationWorkflow {

@WorkflowMethod
void execute();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.temporal.samples.batch.iterator;

import io.temporal.workflow.Workflow;

public class ListingMigrationWorkflowImpl implements ListingMigrationWorkflow {
@Override
public void execute() {
var iteratorBatchWorkflow = Workflow.newChildWorkflowStub(IteratorBatchWorkflow.class);
iteratorBatchWorkflow.processBatch(100, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ public final class RecordLoaderImpl implements RecordLoader {

// The sample always returns 5 pages.
// The real application would iterate over an existing dataset or file.
public static final int PAGE_COUNT = 5;
public static final int MAX_COUNT = 1000;

@Override
public List<SingleRecord> getRecords(int pageSize, int offset) {
List<SingleRecord> records = new ArrayList<>(pageSize);
if (offset < pageSize * PAGE_COUNT) {
if (offset < MAX_COUNT) {
for (int i = 0; i < pageSize; i++) {
records.add(new SingleRecord(offset + i));
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

/** Workflow that implements processing of a single record. */
@WorkflowInterface
public interface RecordProcessorWorkflow {
public interface SingleListingMigrationWorkflow {

/** Processes a single record */
@WorkflowMethod
void processRecord(SingleRecord r);
SingleResponse processRecord(SingleRecord r);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.temporal.samples.batch.iterator;

import io.temporal.workflow.Workflow;
import java.time.Duration;
import java.util.Random;
import org.slf4j.Logger;

/** Fake RecordProcessorWorkflow implementation. */
public class SingleListingMigrationWorkflowImpl implements SingleListingMigrationWorkflow {
public static final Logger log = Workflow.getLogger(SingleListingMigrationWorkflowImpl.class);
private final Random random = Workflow.newRandom();

@Override
public SingleResponse processRecord(SingleRecord r) {
// Simulate some processing
int result = random.nextInt(30) + r.getId();
Workflow.sleep(Duration.ofSeconds(random.nextInt(5)));
log.info("Processed {}, result={}", r, result);
return new SingleResponse(r.getId(), result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.temporal.samples.batch.iterator;

public class SingleResponse {

private int input;
private int result;

public SingleResponse() {}

public SingleResponse(int input, int result) {
this.input = input;
this.result = result;
}

public int getInput() {
return input;
}

public int getResult() {
return result;
}

@Override
public String toString() {
return "SingleResponse{" + "input=" + input + ", result=" + result + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ private static void waitForWorkerAndMakeCurrent(
break;
}
} catch (Exception ignored) {
System.out.println("Worker deployment not found yet, retrying...");
}
Thread.sleep(1000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public class IteratorIteratorBatchWorkflowTest {
/** The sample RecordLoaderImpl always returns the fixed number pages. */
private static boolean[] processedRecords = new boolean[PAGE_SIZE * PAGE_COUNT];

public static class TestRecordProcessorWorkflowImpl implements RecordProcessorWorkflow {
public static class TestSingleListingMigrationWorkflowImpl
implements SingleListingMigrationWorkflow {

@Override
public void processRecord(SingleRecord r) {
Expand All @@ -27,7 +28,8 @@ public void processRecord(SingleRecord r) {
@Rule
public TestWorkflowRule testWorkflowRule =
TestWorkflowRule.newBuilder()
.setWorkflowTypes(IteratorBatchWorkflowImpl.class, TestRecordProcessorWorkflowImpl.class)
.setWorkflowTypes(
IteratorBatchWorkflowImpl.class, TestSingleListingMigrationWorkflowImpl.class)
.setActivityImplementations(new RecordLoaderImpl())
.build();

Expand Down