diff --git a/README.md b/README.md index 4c8e97cc..7aaa1c0a 100644 --- a/README.md +++ b/README.md @@ -397,11 +397,94 @@ however `ActivityNew3` will get executed, since the release wasn't yet checked a every new execution of the workflow — all new activities will get executed, while `ActivityOld` will not. -Later on you can clean it up and drop all the checks if you don't have any older workflows running -or expect them to ever be executed (e.g. reset). - *NOTE: Releases with different names do not depend on each other in any way.* +## Extras + +This section describes optional extra modules included in the SDK for convenience and some +additional functionality. + +### Versioned Workflows + +Implemnting breaking changes using the previously described `#has_release?` flag can be error prone +and results in a condition build up in workflows over time. + +Another way of implementing breaking changes is by doing a full cut over to the new version every +time you need to modify a workflow. This can be achieved manually by treating new versions as +separate workflows. We've simplified this process by making your workflow aware of its versions: + +```ruby +require 'cadence/concerns/versioned' + +class MyWorkflowV1 < Cadence::Workflow + retry_policy max_attempts: 5 + + def execute + Activity2.execute! + end +end + +class MyWorkflowV2 < Cadence::Workflow + timeouts execution: 60 + + def execute + Activity2.execute! + Activity3.execute! + end +end + +class MyWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + version 1, MyWorkflowV1 + version 2, MyWorkflowV2 + + def execute + Activity1.execute! + end +end +``` + +This way you don't need to make any changes to the invocation of your workflows — calling +`Cadence.start_workflow(MyWorkflow)` will resolve the latest version and schedule `MyWorkflowV2`. +It will still appear as if you're executing `MyWorkflow` from the Cadence UI, metrics, logging, etc. +This approach allows you to easily extend your existing workflows without changing anything outside +of your workflow. + +When making a workflow versioned the main class (e.g. `MyWorkflow`) becomes the default version. +Once a workflow was scheduled its version will remain unchanged, so all the previously executed +workflows will be executed using the default implementation. Newly scheduled workflows will pick the +latest available version, but you can specify a version like this: + +```ruby +Cadence.start_workflow(MyWorkflow, options: { version: 1 }) +``` + +Once all the old versions are no longer in use you can remove those files and drop their `version` +definitions (just make sure not to change the numbers for versions that are in active use). + +In case you want to do a gradual rollout you can override the version picker with your own +implementation: + + +```ruby +class MyWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + version 1, MyWorkflowV1 + version 2, MyWorkflowV2 + version_picker do |_latest_version| + if my_feature_flag? + 2 + else + 1 + end + end + + ... +end +``` + ## Testing It is crucial to properly test your workflows and activities before running them in production. The diff --git a/examples/bin/worker b/examples/bin/worker index 91b57173..60b46056 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow) worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(TimeoutWorkflow) worker.register_workflow(TripBookingWorkflow) +worker.register_workflow(VersionedWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/helpers.rb b/examples/spec/helpers.rb index 7a62940b..5b98187c 100644 --- a/examples/spec/helpers.rb +++ b/examples/spec/helpers.rb @@ -1,18 +1,17 @@ require 'securerandom' +require 'cadence/workflow/history' module Helpers def run_workflow(workflow, *input, **args) workflow_id = SecureRandom.uuid - run_id = Cadence.start_workflow( - workflow, - *input, - **args.merge(options: { workflow_id: workflow_id }) - ) + args[:options] = args.fetch(:options, {}).merge(workflow_id: workflow_id) + + run_id = Cadence.start_workflow(workflow, *input, **args) client = Cadence.send(:default_client) connection = client.send(:connection) - connection.get_workflow_execution_history( + result = connection.get_workflow_execution_history( domain: Cadence.configuration.domain, workflow_id: workflow_id, run_id: run_id, @@ -20,5 +19,7 @@ def run_workflow(workflow, *input, **args) wait_for_new_event: true, event_type: :close ) + + Cadence::Workflow::History.new(result.history.events) end end diff --git a/examples/spec/integration/serial_hello_world_workflow_spec.rb b/examples/spec/integration/serial_hello_world_workflow_spec.rb index 04b4b372..d6123ae4 100644 --- a/examples/spec/integration/serial_hello_world_workflow_spec.rb +++ b/examples/spec/integration/serial_hello_world_workflow_spec.rb @@ -4,7 +4,6 @@ it 'completes' do result = run_workflow(described_class, 'Alice', 'Bob', 'John') - expect(result.history.events.first.eventType) - .to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(result.events.first.type).to eq('WorkflowExecutionCompleted') end end diff --git a/examples/spec/integration/versioned_workflow_spec.rb b/examples/spec/integration/versioned_workflow_spec.rb new file mode 100644 index 00000000..b4567853 --- /dev/null +++ b/examples/spec/integration/versioned_workflow_spec.rb @@ -0,0 +1,76 @@ +require 'workflows/versioned_workflow' +require 'cadence/json' + +describe VersionedWorkflow, :integration do + context 'when scheduling' do + context 'without explicit version' do + it 'executes the latest version' do + result = run_workflow(described_class) + + event = result.events.first + + expect(event.type).to eq('WorkflowExecutionCompleted') + expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: version 2') + end + end + + context 'with explicit version' do + let(:options) { { options: { version: 1 } } } + + it 'executes the specified version' do + result = run_workflow(described_class, options) + + event = result.events.first + + expect(event.type).to eq('WorkflowExecutionCompleted') + expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: version 1') + end + end + + context 'with a non-existing version' do + let(:options) { { options: { version: 3 } } } + + it 'raises an error' do + expect do + run_workflow(described_class, options) + end.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for VersionedWorkflow' + ) + end + end + end + + context 'when already scheduled' do + context 'without a version' do + it 'executes the default version' do + # starting with a plain string to skip the automatic header setting + result = run_workflow('VersionedWorkflow') + + event = result.events.first + + expect(event.type).to eq('WorkflowExecutionCompleted') + expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: default version') + end + end + + context 'with a non-existing version' do + let(:options) do + { + options: { + timeouts: { execution: 1 }, + headers: { 'Version' => '3' } + } + } + end + + it 'times out the workflow' do + result = run_workflow('VersionedWorkflow', options) + + event = result.events.first + + expect(event.type).to eq('WorkflowExecutionTimedOut') + end + end + end +end diff --git a/examples/workflows/versioned_workflow.rb b/examples/workflows/versioned_workflow.rb new file mode 100644 index 00000000..95f31024 --- /dev/null +++ b/examples/workflows/versioned_workflow.rb @@ -0,0 +1,16 @@ +require 'cadence/concerns/versioned' +require_relative './versioned_workflow_v1' +require_relative './versioned_workflow_v2' + +class VersionedWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + headers 'MyHeader' => 'MyValue' + + version 1, VersionedWorkflowV1 + version 2, VersionedWorkflowV2 + + def execute + EchoActivity.execute!('default version') + end +end diff --git a/examples/workflows/versioned_workflow_v1.rb b/examples/workflows/versioned_workflow_v1.rb new file mode 100644 index 00000000..282118a8 --- /dev/null +++ b/examples/workflows/versioned_workflow_v1.rb @@ -0,0 +1,5 @@ +class VersionedWorkflowV1 < Cadence::Workflow + def execute + EchoActivity.execute!('version 1') + end +end diff --git a/examples/workflows/versioned_workflow_v2.rb b/examples/workflows/versioned_workflow_v2.rb new file mode 100644 index 00000000..47d76303 --- /dev/null +++ b/examples/workflows/versioned_workflow_v2.rb @@ -0,0 +1,7 @@ +class VersionedWorkflowV2 < Cadence::Workflow + headers 'MyNewHeader' => 'MyNewValue' + + def execute + EchoActivity.execute!('version 2') + end +end diff --git a/lib/cadence/concerns/versioned.rb b/lib/cadence/concerns/versioned.rb new file mode 100644 index 00000000..8c4391c7 --- /dev/null +++ b/lib/cadence/concerns/versioned.rb @@ -0,0 +1,101 @@ +require 'cadence/errors' + +module Cadence + module Concerns + module Versioned + def self.included(base) + base.extend ClassMethods + end + + VERSION_HEADER_NAME = 'Version'.freeze + DEFAULT_VERSION = 0 + + class UnknownWorkflowVersion < Cadence::ClientError; end + + class Workflow + attr_reader :version, :main_class, :version_class + + def initialize(main_class, version = nil) + version ||= main_class.pick_version + version_class = main_class.version_class_for(version) + + @version = version + @main_class = main_class + @version_class = version_class + end + + def domain + if version_class.domain + warn '[WARNING] Overriding domain in a workflow version is not yet supported. ' \ + "Called from #{version_class}." + end + + main_class.domain + end + + def task_list + if version_class.task_list + warn '[WARNING] Overriding task_list in a workflow version is not yet supported. ' \ + "Called from #{version_class}." + end + + main_class.task_list + end + + def retry_policy + version_class.retry_policy || main_class.retry_policy + end + + def timeouts + version_class.timeouts || main_class.timeouts + end + + def headers + (version_class.headers || main_class.headers || {}).merge(VERSION_HEADER_NAME => version.to_s) + end + end + + module ClassMethods + def version(number, workflow_class) + versions[number] = workflow_class + end + + def execute_in_context(context, input) + version = context.headers.fetch(VERSION_HEADER_NAME, DEFAULT_VERSION).to_i + version_class = version_class_for(version) + + if self == version_class + super + else + # forward the method call to the target version class + version_class.execute_in_context(context, input) + end + end + + def version_class_for(version) + versions.fetch(version.to_i) do + raise UnknownWorkflowVersion, "Unknown version #{version} for #{self.name}" + end + end + + def pick_version + version_picker.call(versions.keys.max) + end + + private + + DEFAULT_VERSION_PICKER = lambda { |latest_version| latest_version } + + def version_picker(&block) + return @version_picker || DEFAULT_VERSION_PICKER unless block_given? + @version_picker = block + end + + def versions + # Initialize with the default version + @versions ||= { DEFAULT_VERSION => self } + end + end + end + end +end diff --git a/lib/cadence/execution_options.rb b/lib/cadence/execution_options.rb index ab4ab226..82a6cb9d 100644 --- a/lib/cadence/execution_options.rb +++ b/lib/cadence/execution_options.rb @@ -1,4 +1,5 @@ require 'cadence/concerns/executable' +require 'cadence/concerns/versioned' require 'cadence/retry_policy' module Cadence @@ -17,6 +18,9 @@ def initialize(object, options, defaults = nil) # For Cadence::Workflow and Cadence::Activity use defined values as the next option if object.singleton_class.included_modules.include?(Concerns::Executable) + # In a versioned workflow merge the specific version options with default workflow options + object = Concerns::Versioned::Workflow.new(object, options[:version]) if versioned?(object) + @domain ||= object.domain @task_list ||= object.task_list @retry_policy = object.retry_policy.merge(@retry_policy) if object.retry_policy @@ -41,5 +45,11 @@ def initialize(object, options, defaults = nil) freeze end + + private + + def versioned?(workflow) + workflow.singleton_class.included_modules.include?(Concerns::Versioned::ClassMethods) + end end end diff --git a/lib/cadence/worker.rb b/lib/cadence/worker.rb index a3632950..5b1cb23d 100644 --- a/lib/cadence/worker.rb +++ b/lib/cadence/worker.rb @@ -3,6 +3,7 @@ require 'cadence/execution_options' require 'cadence/executable_lookup' require 'cadence/middleware/entry' +require 'cadence/concerns/versioned' module Cadence class Worker @@ -18,7 +19,12 @@ def initialize(config = Cadence.configuration, **options) end def register_workflow(workflow_class, options = {}) - execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) + execution_options = ExecutionOptions.new( + workflow_class, + # ensure default version to avoid executing version picker in worker context + options.merge(version: Cadence::Concerns::Versioned::DEFAULT_VERSION), + config.default_execution_options + ) key = [execution_options.domain, execution_options.task_list] @workflows[key].add(execution_options.name, workflow_class) diff --git a/spec/unit/lib/cadence/concerns/versioned_spec.rb b/spec/unit/lib/cadence/concerns/versioned_spec.rb new file mode 100644 index 00000000..c8e2fd7c --- /dev/null +++ b/spec/unit/lib/cadence/concerns/versioned_spec.rb @@ -0,0 +1,237 @@ +require 'cadence/concerns/versioned' +require 'cadence/workflow/context' + +describe Cadence::Concerns::Versioned do + class TestVersionedWorkflowV1 < Cadence::Workflow + end + + class TestVersionedWorkflowV2 < Cadence::Workflow + domain 'new-domain' + task_list 'new-task-list' + retry_policy interval: 5, backoff: 1, max_attempts: 2 + timeouts execution: 1 + headers 'HeaderV2' => 'TestV2' + end + + class TestVersionedWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + domain 'domain' + task_list 'task-list' + retry_policy interval: 1, backoff: 2, max_attempts: 5 + timeouts start_to_close: 10 + headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' + + version 1, TestVersionedWorkflowV1 + version 2, TestVersionedWorkflowV2 + end + + describe described_class::Workflow do + subject { described_class.new(TestVersionedWorkflow, version) } + + describe '#initialize' do + before { allow(TestVersionedWorkflow).to receive(:pick_version).and_call_original } + + context 'when passed no version header' do + let(:version) { nil } + + it 'initializes the latest version' do + expect(subject.version).to eq(2) + expect(subject.main_class).to eq(TestVersionedWorkflow) + expect(subject.version_class).to eq(TestVersionedWorkflowV2) + end + + it 'calls version picker' do + subject + + expect(TestVersionedWorkflow).to have_received(:pick_version) + end + end + + context 'when passed a specific version header' do + let(:version) { 1 } + + it 'initializes the specified version' do + expect(subject.version).to eq(1) + expect(subject.main_class).to eq(TestVersionedWorkflow) + expect(subject.version_class).to eq(TestVersionedWorkflowV1) + expect(TestVersionedWorkflow).not_to have_received(:pick_version) + end + end + + context 'when passed a non-existing version' do + let(:version) { 3 } + + it 'raises UnknownWorkflowVersion' do + expect { subject }.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for TestVersionedWorkflow' + ) + expect(TestVersionedWorkflow).not_to have_received(:pick_version) + end + end + end + + context 'when version does not override the attributes' do + let(:version) { 1 } + + before { allow(subject).to receive(:warn) } + + describe '#domain' do + it 'returns default version domain' do + expect(subject.domain).to eq('domain') + expect(subject).not_to have_received(:warn) + end + end + + describe '#task_list' do + it 'returns default version task_list' do + expect(subject.task_list).to eq('task-list') + expect(subject).not_to have_received(:warn) + end + end + + describe '#retry_policy' do + it 'returns default version retry_policy' do + expect(subject.retry_policy).to eq(interval: 1, backoff: 2, max_attempts: 5) + end + end + + describe '#timeouts' do + it 'returns default version timeouts' do + expect(subject.timeouts).to eq(start_to_close: 10) + end + end + + describe '#headers' do + it 'returns default version headers including version header' do + expect(subject.headers).to eq( + 'HeaderA' => 'TestA', + 'HeaderB' => 'TestB', + Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '1' + ) + end + end + end + + context 'when version overwrites the attribute' do + let(:version) { 2 } + + before { allow(subject).to receive(:warn) } + + describe '#domain' do + it 'returns default version domain and warns' do + expect(subject.domain).to eq('domain') + expect(subject) + .to have_received(:warn) + .with('[WARNING] Overriding domain in a workflow version is not yet supported. ' \ + 'Called from TestVersionedWorkflowV2.') + end + end + + describe '#task_list' do + it 'returns default version task_list and warns' do + expect(subject.task_list).to eq('task-list') + expect(subject) + .to have_received(:warn) + .with('[WARNING] Overriding task_list in a workflow version is not yet supported. ' \ + 'Called from TestVersionedWorkflowV2.') + end + end + + describe '#retry_policy' do + it 'returns overriden retry_policy' do + expect(subject.retry_policy).to eq(interval: 5, backoff: 1, max_attempts: 2) + end + end + + describe '#timeouts' do + it 'returns overriden timeouts' do + expect(subject.timeouts).to eq(execution: 1) + end + end + + describe '#headers' do + it 'returns overriden headers including version header' do + expect(subject.headers).to eq( + 'HeaderV2' => 'TestV2', + Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2' + ) + end + end + end + end + + describe described_class::ClassMethods do + subject { TestVersionedWorkflow } + + describe '.version' do + after { TestVersionedWorkflow.send(:versions).delete(4) } + + it 'adds a new version' do + subject.version(4, TestVersionedWorkflowV1) + + expect(subject.version_class_for(4)).to eq(TestVersionedWorkflowV1) + end + end + + describe '.execute_in_context' do + let(:context) { instance_double(Cadence::Workflow::Context, headers: headers) } + + context 'when called with a non-default version' do + let(:headers) { { Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2' } } + before { allow(TestVersionedWorkflowV2).to receive(:execute_in_context) } + + it 'calls version' do + subject.execute_in_context(context, nil) + + expect(TestVersionedWorkflowV2).to have_received(:execute_in_context).with(context, nil) + end + end + end + + describe '.version_class_for' do + context 'when given a valid version' do + it 'returns version class' do + expect(subject.version_class_for(2)).to eq(TestVersionedWorkflowV2) + end + end + + context 'when given a default version' do + it 'returns default version class' do + expect(subject.version_class_for(Cadence::Concerns::Versioned::DEFAULT_VERSION)) + .to eq(TestVersionedWorkflow) + end + end + + context 'when given an invalid version' do + it 'raises UnknownWorkflowVersion' do + expect { subject.version_class_for(3) }.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for TestVersionedWorkflow' + ) + end + end + end + + describe '.pick_version' do + class TestPickVersionWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + version_picker { |latest_version| latest_version + 42 } + end + + context 'when using default version picker' do + it 'returns the latest version' do + expect(TestVersionedWorkflow.pick_version).to eq(2) + end + end + + context 'when using overriden version picker' do + it 'returns a custom version' do + expect(TestPickVersionWorkflow.pick_version).to eq(42) + end + end + end + end +end diff --git a/spec/unit/lib/cadence/execution_options_spec.rb b/spec/unit/lib/cadence/execution_options_spec.rb index 1d82720a..81accfc7 100644 --- a/spec/unit/lib/cadence/execution_options_spec.rb +++ b/spec/unit/lib/cadence/execution_options_spec.rb @@ -1,5 +1,7 @@ require 'cadence/execution_options' require 'cadence/configuration' +require 'cadence/workflow' +require 'cadence/concerns/versioned' describe Cadence::ExecutionOptions do subject { described_class.new(object, options, defaults) } @@ -195,4 +197,90 @@ class TestWorkflow < Cadence::Workflow end end end + + context 'when initialized with a Versioned workflow' do + class TestVersionedExecutionOptionsWorkflowV1 < Cadence::Workflow + retry_policy interval: 5, backoff: 1, max_attempts: 2 + timeouts execution: 1 + end + + class TestVersionedExecutionOptionsWorkflowV2 < Cadence::Workflow + domain 'new-domain' + task_list 'new-task-list' + headers 'HeaderV2' => 'TestV2' + end + + class TestVersionedExecutionOptionsWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + domain 'domain' + task_list 'task-list' + retry_policy interval: 1, backoff: 2, max_attempts: 5 + timeouts start_to_close: 10 + headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' + + version 1, TestVersionedExecutionOptionsWorkflowV1 + version 2, TestVersionedExecutionOptionsWorkflowV2 + end + + let(:object) { TestVersionedExecutionOptionsWorkflow } + let(:options) { {} } + + context 'when initialized without the version header' do + it 'is initialized with a mix of latest version and default version values' do + expect(subject.name).to eq(object.name) + expect(subject.domain).to eq('domain') + expect(subject.task_list).to eq('task-list') + expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) + expect(subject.retry_policy.interval).to eq(1) + expect(subject.retry_policy.backoff).to eq(2) + expect(subject.retry_policy.max_attempts).to eq(5) + expect(subject.timeouts).to eq(start_to_close: 10) + expect(subject.headers).to eq( + 'HeaderV2' => 'TestV2', + 'Version' => '2' + ) + end + end + + context 'when initialized with the version header' do + let(:options) { { version: 1 } } + + it 'is initialized with a mix of specified version and default version values' do + expect(subject.name).to eq(object.name) + expect(subject.domain).to eq('domain') + expect(subject.task_list).to eq('task-list') + expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) + expect(subject.retry_policy.interval).to eq(5) + expect(subject.retry_policy.backoff).to eq(1) + expect(subject.retry_policy.max_attempts).to eq(2) + expect(subject.timeouts).to eq(execution: 1) + expect(subject.headers).to eq( + 'HeaderA' => 'TestA', + 'HeaderB' => 'TestB', + 'Version' => '1' + ) + end + end + + context 'when initialized with the default version' do + let(:options) { { version: 0 } } + + it 'is initialized with a default version values' do + expect(subject.name).to eq(object.name) + expect(subject.domain).to eq('domain') + expect(subject.task_list).to eq('task-list') + expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) + expect(subject.retry_policy.interval).to eq(1) + expect(subject.retry_policy.backoff).to eq(2) + expect(subject.retry_policy.max_attempts).to eq(5) + expect(subject.timeouts).to eq(start_to_close: 10) + expect(subject.headers).to eq( + 'HeaderA' => 'TestA', + 'HeaderB' => 'TestB', + 'Version' => '0' + ) + end + end + end end