From 452dc9e02271fa6ba8a8df8a7ac63a700e8cdf97 Mon Sep 17 00:00:00 2001 From: Anthony Dmitriyev Date: Thu, 27 Jan 2022 13:47:18 +0000 Subject: [PATCH 1/7] Implement a versioned workflow concern --- examples/bin/worker | 1 + .../integration/versioned_workflow_spec.rb | 36 ++++++++ examples/workflows/versioned_workflow.rb | 16 ++++ examples/workflows/versioned_workflow/v1.rb | 7 ++ examples/workflows/versioned_workflow/v2.rb | 9 ++ lib/cadence/concerns/versioned.rb | 91 +++++++++++++++++++ 6 files changed, 160 insertions(+) create mode 100644 examples/spec/integration/versioned_workflow_spec.rb create mode 100644 examples/workflows/versioned_workflow.rb create mode 100644 examples/workflows/versioned_workflow/v1.rb create mode 100644 examples/workflows/versioned_workflow/v2.rb create mode 100644 lib/cadence/concerns/versioned.rb diff --git a/examples/bin/worker b/examples/bin/worker index 91b57173..60b46056 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow) worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(TimeoutWorkflow) worker.register_workflow(TripBookingWorkflow) +worker.register_workflow(VersionedWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/integration/versioned_workflow_spec.rb b/examples/spec/integration/versioned_workflow_spec.rb new file mode 100644 index 00000000..808bac36 --- /dev/null +++ b/examples/spec/integration/versioned_workflow_spec.rb @@ -0,0 +1,36 @@ +require 'workflows/versioned_workflow' + +describe VersionedWorkflow, :integration do + context 'without explicit version' do + it 'executes latest version' do + result = run_workflow(described_class) + + event = result.history.events.first + + expect(event.eventType).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(event.result).to eq('ECHO: version 2') + end + end + + context 'with explicit version' do + it 'executes specified version' do + result = run_workflow(described_class, options: { headers: { 'Version' => '1' }}) + + event = result.history.events.first + + expect(event.eventType).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(event.result).to eq('ECHO: version 1') + end + end + + context 'with a missing version' do + it 'executes default version' do + result = run_workflow(described_class, options: { headers: { 'Version' => nil }}) + + event = result.history.events.first + + expect(event.eventType).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(event.result).to eq('ECHO: default version') + end + end +end diff --git a/examples/workflows/versioned_workflow.rb b/examples/workflows/versioned_workflow.rb new file mode 100644 index 00000000..8e990ee1 --- /dev/null +++ b/examples/workflows/versioned_workflow.rb @@ -0,0 +1,16 @@ +require 'cadence/concerns/versioned' +require_relative './versioned_workflow/v1' +require_relative './versioned_workflow/v2' + +class VersionedWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + headers 'MyHeader' => 'MyValue' + + version 1, V1 + version 2, V2 + + def execute + EchoActivity.execute!('default version') + end +end diff --git a/examples/workflows/versioned_workflow/v1.rb b/examples/workflows/versioned_workflow/v1.rb new file mode 100644 index 00000000..e8ccaa9a --- /dev/null +++ b/examples/workflows/versioned_workflow/v1.rb @@ -0,0 +1,7 @@ +class VersionedWorkflow < Cadence::Workflow + class V1 < Cadence::Workflow + def execute + EchoActivity.execute!('version 1') + end + end +end diff --git a/examples/workflows/versioned_workflow/v2.rb b/examples/workflows/versioned_workflow/v2.rb new file mode 100644 index 00000000..4b79df57 --- /dev/null +++ b/examples/workflows/versioned_workflow/v2.rb @@ -0,0 +1,9 @@ +class VersionedWorkflow < Cadence::Workflow + class V2 < Cadence::Workflow + headers 'MyNewHeader' => 'MyNewValue' + + def execute + EchoActivity.execute!('version 2') + end + end +end diff --git a/lib/cadence/concerns/versioned.rb b/lib/cadence/concerns/versioned.rb new file mode 100644 index 00000000..6d13ec53 --- /dev/null +++ b/lib/cadence/concerns/versioned.rb @@ -0,0 +1,91 @@ +require 'cadence/errors' + +module Cadence + module Concerns + module Versioned + def self.included(base) + base.extend ClassMethods + end + + VERSION_HEADER_NAME = 'Version'.freeze + DEFAULT_VERSION = 0 + + class UnknownWorkflowVersion < Cadence::ClientError; end + + module ClassMethods + def execute_in_context(context, input) + target_version = context.headers[VERSION_HEADER_NAME].to_i + workflow_class = version_class_for(target_version) + + if !workflow_class + raise UnknownWorkflowVersion, "Unknown version #{target_version} for #{self.name}" + end + + super + end + + def version(number, workflow_class) + versions[number] = workflow_class + end + + def domain(*args) + return version_class_for(latest_version).domain || @domain if args.empty? + @domain = args.first + end + + def task_list(*args) + return version_class_for(latest_version).task_list || @task_list if args.empty? + @task_list = args.first + end + + def retry_policy(*args) + return version_class_for(latest_version).retry_policy || @retry_policy if args.empty? + @retry_policy = Cadence::RetryPolicy.new(args.first) + @retry_policy.validate! + end + + def timeouts(*args) + return version_class_for(latest_version).timeouts || @timeouts if args.empty? + @timeouts = args.first + end + + def headers(*args) + if args.empty? + headers = version_class_for(latest_version).headers || @headers + + if headers.key?(VERSION_HEADER_NAME) + warn "[WARNING] #{VERSION_HEADER_NAME} header collision" + end + + return headers.merge(VERSION_HEADER_NAME => latest_version.to_s) + end + + super + end + + def new(context) + target_version = context.headers[VERSION_HEADER_NAME].to_i + workflow_class = version_class_for(target_version) + + # Swap top-level class with version-specific class + workflow_class.new(context) + end + + private + + def versions + # Initialize with the default version + @versions ||= { DEFAULT_VERSION => self } + end + + def version_class_for(version) + versions[version] + end + + def latest_version + versions.keys.max + end + end + end + end +end From c97771fce93ba8f8b551177973c7409fcc121fcd Mon Sep 17 00:00:00 2001 From: Anthony Dmitriyev Date: Thu, 27 Jan 2022 13:55:13 +0000 Subject: [PATCH 2/7] fixup! Implement a versioned workflow concern --- examples/spec/helpers.rb | 5 ++++- .../integration/versioned_workflow_spec.rb | 18 +++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/examples/spec/helpers.rb b/examples/spec/helpers.rb index 7a62940b..847017fb 100644 --- a/examples/spec/helpers.rb +++ b/examples/spec/helpers.rb @@ -1,4 +1,5 @@ require 'securerandom' +require 'cadence/workflow/history' module Helpers def run_workflow(workflow, *input, **args) @@ -12,7 +13,7 @@ def run_workflow(workflow, *input, **args) client = Cadence.send(:default_client) connection = client.send(:connection) - connection.get_workflow_execution_history( + result = connection.get_workflow_execution_history( domain: Cadence.configuration.domain, workflow_id: workflow_id, run_id: run_id, @@ -20,5 +21,7 @@ def run_workflow(workflow, *input, **args) wait_for_new_event: true, event_type: :close ) + + Cadence::History.new(result.history.events) end end diff --git a/examples/spec/integration/versioned_workflow_spec.rb b/examples/spec/integration/versioned_workflow_spec.rb index 808bac36..b39fd94a 100644 --- a/examples/spec/integration/versioned_workflow_spec.rb +++ b/examples/spec/integration/versioned_workflow_spec.rb @@ -5,10 +5,10 @@ it 'executes latest version' do result = run_workflow(described_class) - event = result.history.events.first + event = result.events.first - expect(event.eventType).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) - expect(event.result).to eq('ECHO: version 2') + expect(event.type).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(event.attributes.result).to eq('ECHO: version 2') end end @@ -16,10 +16,10 @@ it 'executes specified version' do result = run_workflow(described_class, options: { headers: { 'Version' => '1' }}) - event = result.history.events.first + event = result.events.first - expect(event.eventType).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) - expect(event.result).to eq('ECHO: version 1') + expect(event.type).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(event.attributes.result).to eq('ECHO: version 1') end end @@ -27,10 +27,10 @@ it 'executes default version' do result = run_workflow(described_class, options: { headers: { 'Version' => nil }}) - event = result.history.events.first + event = result.events.first - expect(event.eventType).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) - expect(event.result).to eq('ECHO: default version') + expect(event.type).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(event.attributes.result).to eq('ECHO: default version') end end end From 92cadeab3644d00f212ac3d117ff15cb0dc47fa1 Mon Sep 17 00:00:00 2001 From: Anthony Dmitriyev Date: Thu, 27 Jan 2022 13:56:37 +0000 Subject: [PATCH 3/7] fixup! Implement a versioned workflow concern --- examples/spec/helpers.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/spec/helpers.rb b/examples/spec/helpers.rb index 847017fb..efde6038 100644 --- a/examples/spec/helpers.rb +++ b/examples/spec/helpers.rb @@ -22,6 +22,6 @@ def run_workflow(workflow, *input, **args) event_type: :close ) - Cadence::History.new(result.history.events) + Cadence::Workflow::History.new(result.history.events) end end From b388965745e4fea75bb56290ca9e23c41623cac0 Mon Sep 17 00:00:00 2001 From: antstorm Date: Wed, 16 Feb 2022 16:18:44 +0000 Subject: [PATCH 4/7] Fix failing spec --- examples/spec/helpers.rb | 8 +++----- .../spec/integration/serial_hello_world_workflow_spec.rb | 3 +-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/examples/spec/helpers.rb b/examples/spec/helpers.rb index efde6038..5b98187c 100644 --- a/examples/spec/helpers.rb +++ b/examples/spec/helpers.rb @@ -4,11 +4,9 @@ module Helpers def run_workflow(workflow, *input, **args) workflow_id = SecureRandom.uuid - run_id = Cadence.start_workflow( - workflow, - *input, - **args.merge(options: { workflow_id: workflow_id }) - ) + args[:options] = args.fetch(:options, {}).merge(workflow_id: workflow_id) + + run_id = Cadence.start_workflow(workflow, *input, **args) client = Cadence.send(:default_client) connection = client.send(:connection) diff --git a/examples/spec/integration/serial_hello_world_workflow_spec.rb b/examples/spec/integration/serial_hello_world_workflow_spec.rb index 04b4b372..d6123ae4 100644 --- a/examples/spec/integration/serial_hello_world_workflow_spec.rb +++ b/examples/spec/integration/serial_hello_world_workflow_spec.rb @@ -4,7 +4,6 @@ it 'completes' do result = run_workflow(described_class, 'Alice', 'Bob', 'John') - expect(result.history.events.first.eventType) - .to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) + expect(result.events.first.type).to eq('WorkflowExecutionCompleted') end end From 59a1f4b841ad186bbe6cfa397e9b40ba4d174b0b Mon Sep 17 00:00:00 2001 From: antstorm Date: Wed, 16 Feb 2022 16:19:38 +0000 Subject: [PATCH 5/7] fixup! Implement a versioned workflow concern --- .../integration/versioned_workflow_spec.rb | 78 ++++++++++++---- examples/workflows/versioned_workflow.rb | 8 +- examples/workflows/versioned_workflow/v1.rb | 7 -- examples/workflows/versioned_workflow/v2.rb | 9 -- examples/workflows/versioned_workflow_v1.rb | 5 ++ examples/workflows/versioned_workflow_v2.rb | 7 ++ lib/cadence/concerns/versioned.rb | 89 +++++++++---------- lib/cadence/execution_options.rb | 10 +++ .../lib/cadence/execution_options_spec.rb | 88 ++++++++++++++++++ 9 files changed, 215 insertions(+), 86 deletions(-) delete mode 100644 examples/workflows/versioned_workflow/v1.rb delete mode 100644 examples/workflows/versioned_workflow/v2.rb create mode 100644 examples/workflows/versioned_workflow_v1.rb create mode 100644 examples/workflows/versioned_workflow_v2.rb diff --git a/examples/spec/integration/versioned_workflow_spec.rb b/examples/spec/integration/versioned_workflow_spec.rb index b39fd94a..f5a8a181 100644 --- a/examples/spec/integration/versioned_workflow_spec.rb +++ b/examples/spec/integration/versioned_workflow_spec.rb @@ -1,36 +1,76 @@ require 'workflows/versioned_workflow' +require 'cadence/json' describe VersionedWorkflow, :integration do - context 'without explicit version' do - it 'executes latest version' do - result = run_workflow(described_class) + context 'when scheduling' do + context 'without explicit version' do + it 'executes the latest version' do + result = run_workflow(described_class) - event = result.events.first + event = result.events.first - expect(event.type).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) - expect(event.attributes.result).to eq('ECHO: version 2') + expect(event.type).to eq('WorkflowExecutionCompleted') + expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: version 2') + end end - end - context 'with explicit version' do - it 'executes specified version' do - result = run_workflow(described_class, options: { headers: { 'Version' => '1' }}) + context 'with explicit version' do + let(:options) { { options: { headers: { 'Version' => '1' } } } } + + it 'executes the specified version' do + result = run_workflow(described_class, options) + + event = result.events.first + + expect(event.type).to eq('WorkflowExecutionCompleted') + expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: version 1') + end + end - event = result.events.first + context 'with a non-existing version' do + let(:options) { { options: { headers: { 'Version' => '3' } } } } - expect(event.type).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) - expect(event.attributes.result).to eq('ECHO: version 1') + it 'raises an error' do + expect do + run_workflow(described_class, options) + end.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for VersionedWorkflow' + ) + end end end - context 'with a missing version' do - it 'executes default version' do - result = run_workflow(described_class, options: { headers: { 'Version' => nil }}) + context 'when already scheduled' do + context 'without a version' do + it 'executes the default version' do + # starting with a plain string to skip the automatic header setting + result = run_workflow('VersionedWorkflow') + + event = result.events.first + + expect(event.type).to eq('WorkflowExecutionCompleted') + expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: default version') + end + end + + context 'with a non-existing version' do + let(:options) do + { + options: { + timeouts: { execution: 1 }, + headers: { 'Version' => '3' } + } + } + end + + it 'times out the workflow' do + result = run_workflow('VersionedWorkflow', options) - event = result.events.first + event = result.events.first - expect(event.type).to eq(CadenceThrift::EventType::WorkflowExecutionCompleted) - expect(event.attributes.result).to eq('ECHO: default version') + expect(event.type).to eq('WorkflowExecutionTimedOut') + end end end end diff --git a/examples/workflows/versioned_workflow.rb b/examples/workflows/versioned_workflow.rb index 8e990ee1..95f31024 100644 --- a/examples/workflows/versioned_workflow.rb +++ b/examples/workflows/versioned_workflow.rb @@ -1,14 +1,14 @@ require 'cadence/concerns/versioned' -require_relative './versioned_workflow/v1' -require_relative './versioned_workflow/v2' +require_relative './versioned_workflow_v1' +require_relative './versioned_workflow_v2' class VersionedWorkflow < Cadence::Workflow include Cadence::Concerns::Versioned headers 'MyHeader' => 'MyValue' - version 1, V1 - version 2, V2 + version 1, VersionedWorkflowV1 + version 2, VersionedWorkflowV2 def execute EchoActivity.execute!('default version') diff --git a/examples/workflows/versioned_workflow/v1.rb b/examples/workflows/versioned_workflow/v1.rb deleted file mode 100644 index e8ccaa9a..00000000 --- a/examples/workflows/versioned_workflow/v1.rb +++ /dev/null @@ -1,7 +0,0 @@ -class VersionedWorkflow < Cadence::Workflow - class V1 < Cadence::Workflow - def execute - EchoActivity.execute!('version 1') - end - end -end diff --git a/examples/workflows/versioned_workflow/v2.rb b/examples/workflows/versioned_workflow/v2.rb deleted file mode 100644 index 4b79df57..00000000 --- a/examples/workflows/versioned_workflow/v2.rb +++ /dev/null @@ -1,9 +0,0 @@ -class VersionedWorkflow < Cadence::Workflow - class V2 < Cadence::Workflow - headers 'MyNewHeader' => 'MyNewValue' - - def execute - EchoActivity.execute!('version 2') - end - end -end diff --git a/examples/workflows/versioned_workflow_v1.rb b/examples/workflows/versioned_workflow_v1.rb new file mode 100644 index 00000000..282118a8 --- /dev/null +++ b/examples/workflows/versioned_workflow_v1.rb @@ -0,0 +1,5 @@ +class VersionedWorkflowV1 < Cadence::Workflow + def execute + EchoActivity.execute!('version 1') + end +end diff --git a/examples/workflows/versioned_workflow_v2.rb b/examples/workflows/versioned_workflow_v2.rb new file mode 100644 index 00000000..47d76303 --- /dev/null +++ b/examples/workflows/versioned_workflow_v2.rb @@ -0,0 +1,7 @@ +class VersionedWorkflowV2 < Cadence::Workflow + headers 'MyNewHeader' => 'MyNewValue' + + def execute + EchoActivity.execute!('version 2') + end +end diff --git a/lib/cadence/concerns/versioned.rb b/lib/cadence/concerns/versioned.rb index 6d13ec53..416c6894 100644 --- a/lib/cadence/concerns/versioned.rb +++ b/lib/cadence/concerns/versioned.rb @@ -12,63 +12,66 @@ def self.included(base) class UnknownWorkflowVersion < Cadence::ClientError; end - module ClassMethods - def execute_in_context(context, input) - target_version = context.headers[VERSION_HEADER_NAME].to_i - workflow_class = version_class_for(target_version) - - if !workflow_class - raise UnknownWorkflowVersion, "Unknown version #{target_version} for #{self.name}" - end - - super + class Workflow + def initialize(main_class, headers) + version = headers.fetch(VERSION_HEADER_NAME, main_class.latest_version).to_i + version_class = main_class.version_class_for(version) + + @version = version + @main_class = main_class + @version_class = version_class end - def version(number, workflow_class) - versions[number] = workflow_class + def domain + version_class.domain || main_class.domain end - def domain(*args) - return version_class_for(latest_version).domain || @domain if args.empty? - @domain = args.first + def task_list + version_class.task_list || main_class.task_list end - def task_list(*args) - return version_class_for(latest_version).task_list || @task_list if args.empty? - @task_list = args.first + def retry_policy + version_class.retry_policy || main_class.retry_policy end - def retry_policy(*args) - return version_class_for(latest_version).retry_policy || @retry_policy if args.empty? - @retry_policy = Cadence::RetryPolicy.new(args.first) - @retry_policy.validate! + def timeouts + version_class.timeouts || main_class.timeouts end - def timeouts(*args) - return version_class_for(latest_version).timeouts || @timeouts if args.empty? - @timeouts = args.first + def headers + (version_class.headers || main_class.headers || {}).merge(VERSION_HEADER_NAME => version.to_s) end - def headers(*args) - if args.empty? - headers = version_class_for(latest_version).headers || @headers + private - if headers.key?(VERSION_HEADER_NAME) - warn "[WARNING] #{VERSION_HEADER_NAME} header collision" - end + attr_reader :version, :main_class, :version_class + end - return headers.merge(VERSION_HEADER_NAME => latest_version.to_s) - end + module ClassMethods + def version(number, workflow_class) + versions[number] = workflow_class + end - super + def execute_in_context(context, input) + version = context.headers.fetch(VERSION_HEADER_NAME, DEFAULT_VERSION).to_i + version_class = version_class_for(version) + + if self == version_class + super + else + # forward the method call to the target version class + version_class.execute_in_context(context, input) + end end - def new(context) - target_version = context.headers[VERSION_HEADER_NAME].to_i - workflow_class = version_class_for(target_version) + def version_class_for(version) + versions.fetch(version.to_i) do + raise UnknownWorkflowVersion, "Unknown version #{version} for #{self.name}" + end + end - # Swap top-level class with version-specific class - workflow_class.new(context) + def latest_version + versions.keys.max end private @@ -77,14 +80,6 @@ def versions # Initialize with the default version @versions ||= { DEFAULT_VERSION => self } end - - def version_class_for(version) - versions[version] - end - - def latest_version - versions.keys.max - end end end end diff --git a/lib/cadence/execution_options.rb b/lib/cadence/execution_options.rb index ab4ab226..6f8c5796 100644 --- a/lib/cadence/execution_options.rb +++ b/lib/cadence/execution_options.rb @@ -1,4 +1,5 @@ require 'cadence/concerns/executable' +require 'cadence/concerns/versioned' require 'cadence/retry_policy' module Cadence @@ -17,6 +18,9 @@ def initialize(object, options, defaults = nil) # For Cadence::Workflow and Cadence::Activity use defined values as the next option if object.singleton_class.included_modules.include?(Concerns::Executable) + # In a versioned workflow merge the specific version options with default workflow options + object = Concerns::Versioned::Workflow.new(object, @headers) if versioned?(object) + @domain ||= object.domain @task_list ||= object.task_list @retry_policy = object.retry_policy.merge(@retry_policy) if object.retry_policy @@ -41,5 +45,11 @@ def initialize(object, options, defaults = nil) freeze end + + private + + def versioned?(workflow) + workflow.singleton_class.included_modules.include?(Concerns::Versioned::ClassMethods) + end end end diff --git a/spec/unit/lib/cadence/execution_options_spec.rb b/spec/unit/lib/cadence/execution_options_spec.rb index 1d82720a..46bc4caf 100644 --- a/spec/unit/lib/cadence/execution_options_spec.rb +++ b/spec/unit/lib/cadence/execution_options_spec.rb @@ -1,5 +1,7 @@ require 'cadence/execution_options' require 'cadence/configuration' +require 'cadence/workflow' +require 'cadence/concerns/versioned' describe Cadence::ExecutionOptions do subject { described_class.new(object, options, defaults) } @@ -195,4 +197,90 @@ class TestWorkflow < Cadence::Workflow end end end + + context 'when initialized with a Versioned workflow' do + class TestVersionedWorkflowV1 < Cadence::Workflow + retry_policy interval: 5, backoff: 1, max_attempts: 2 + timeouts execution: 1 + end + + class TestVersionedWorkflowV2 < Cadence::Workflow + domain 'new-domain' + task_list 'new-task-list' + headers 'HeaderV2' => 'TestV2' + end + + class TestVersionedWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + domain 'domain' + task_list 'task-list' + retry_policy interval: 1, backoff: 2, max_attempts: 5 + timeouts start_to_close: 10 + headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' + + version 1, TestVersionedWorkflowV1 + version 2, TestVersionedWorkflowV2 + end + + let(:object) { TestVersionedWorkflow } + let(:options) { {} } + + context 'when initialized without the version header' do + it 'is initialized with a mix of latest version and default version values' do + expect(subject.name).to eq(object.name) + expect(subject.domain).to eq('new-domain') + expect(subject.task_list).to eq('new-task-list') + expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) + expect(subject.retry_policy.interval).to eq(1) + expect(subject.retry_policy.backoff).to eq(2) + expect(subject.retry_policy.max_attempts).to eq(5) + expect(subject.timeouts).to eq(start_to_close: 10) + expect(subject.headers).to eq( + 'HeaderV2' => 'TestV2', + 'Version' => '2' + ) + end + end + + context 'when initialized with the version header' do + let(:options) { { headers: { 'Version' => '1' } } } + + it 'is initialized with a mix of specified version and default version values' do + expect(subject.name).to eq(object.name) + expect(subject.domain).to eq('domain') + expect(subject.task_list).to eq('task-list') + expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) + expect(subject.retry_policy.interval).to eq(5) + expect(subject.retry_policy.backoff).to eq(1) + expect(subject.retry_policy.max_attempts).to eq(2) + expect(subject.timeouts).to eq(execution: 1) + expect(subject.headers).to eq( + 'HeaderA' => 'TestA', + 'HeaderB' => 'TestB', + 'Version' => '1' + ) + end + end + + context 'when initialized with the default version' do + let(:options) { { headers: { 'Version' => '0' } } } + + it 'is initialized with a default version values' do + expect(subject.name).to eq(object.name) + expect(subject.domain).to eq('domain') + expect(subject.task_list).to eq('task-list') + expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) + expect(subject.retry_policy.interval).to eq(1) + expect(subject.retry_policy.backoff).to eq(2) + expect(subject.retry_policy.max_attempts).to eq(5) + expect(subject.timeouts).to eq(start_to_close: 10) + expect(subject.headers).to eq( + 'HeaderA' => 'TestA', + 'HeaderB' => 'TestB', + 'Version' => '0' + ) + end + end + end end From deab24ee22ba4b892a72315e892e5efcbfc60d3e Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 18 Feb 2022 14:50:11 +0000 Subject: [PATCH 6/7] fixup! Implement a versioned workflow concern --- .../integration/versioned_workflow_spec.rb | 4 +- lib/cadence/concerns/versioned.rb | 35 ++- lib/cadence/execution_options.rb | 2 +- lib/cadence/worker.rb | 8 +- .../lib/cadence/concerns/versioned_spec.rb | 237 ++++++++++++++++++ .../lib/cadence/execution_options_spec.rb | 20 +- 6 files changed, 282 insertions(+), 24 deletions(-) create mode 100644 spec/unit/lib/cadence/concerns/versioned_spec.rb diff --git a/examples/spec/integration/versioned_workflow_spec.rb b/examples/spec/integration/versioned_workflow_spec.rb index f5a8a181..b4567853 100644 --- a/examples/spec/integration/versioned_workflow_spec.rb +++ b/examples/spec/integration/versioned_workflow_spec.rb @@ -15,7 +15,7 @@ end context 'with explicit version' do - let(:options) { { options: { headers: { 'Version' => '1' } } } } + let(:options) { { options: { version: 1 } } } it 'executes the specified version' do result = run_workflow(described_class, options) @@ -28,7 +28,7 @@ end context 'with a non-existing version' do - let(:options) { { options: { headers: { 'Version' => '3' } } } } + let(:options) { { options: { version: 3 } } } it 'raises an error' do expect do diff --git a/lib/cadence/concerns/versioned.rb b/lib/cadence/concerns/versioned.rb index 416c6894..8c4391c7 100644 --- a/lib/cadence/concerns/versioned.rb +++ b/lib/cadence/concerns/versioned.rb @@ -13,8 +13,10 @@ def self.included(base) class UnknownWorkflowVersion < Cadence::ClientError; end class Workflow - def initialize(main_class, headers) - version = headers.fetch(VERSION_HEADER_NAME, main_class.latest_version).to_i + attr_reader :version, :main_class, :version_class + + def initialize(main_class, version = nil) + version ||= main_class.pick_version version_class = main_class.version_class_for(version) @version = version @@ -23,11 +25,21 @@ def initialize(main_class, headers) end def domain - version_class.domain || main_class.domain + if version_class.domain + warn '[WARNING] Overriding domain in a workflow version is not yet supported. ' \ + "Called from #{version_class}." + end + + main_class.domain end def task_list - version_class.task_list || main_class.task_list + if version_class.task_list + warn '[WARNING] Overriding task_list in a workflow version is not yet supported. ' \ + "Called from #{version_class}." + end + + main_class.task_list end def retry_policy @@ -41,10 +53,6 @@ def timeouts def headers (version_class.headers || main_class.headers || {}).merge(VERSION_HEADER_NAME => version.to_s) end - - private - - attr_reader :version, :main_class, :version_class end module ClassMethods @@ -70,12 +78,19 @@ def version_class_for(version) end end - def latest_version - versions.keys.max + def pick_version + version_picker.call(versions.keys.max) end private + DEFAULT_VERSION_PICKER = lambda { |latest_version| latest_version } + + def version_picker(&block) + return @version_picker || DEFAULT_VERSION_PICKER unless block_given? + @version_picker = block + end + def versions # Initialize with the default version @versions ||= { DEFAULT_VERSION => self } diff --git a/lib/cadence/execution_options.rb b/lib/cadence/execution_options.rb index 6f8c5796..82a6cb9d 100644 --- a/lib/cadence/execution_options.rb +++ b/lib/cadence/execution_options.rb @@ -19,7 +19,7 @@ def initialize(object, options, defaults = nil) # For Cadence::Workflow and Cadence::Activity use defined values as the next option if object.singleton_class.included_modules.include?(Concerns::Executable) # In a versioned workflow merge the specific version options with default workflow options - object = Concerns::Versioned::Workflow.new(object, @headers) if versioned?(object) + object = Concerns::Versioned::Workflow.new(object, options[:version]) if versioned?(object) @domain ||= object.domain @task_list ||= object.task_list diff --git a/lib/cadence/worker.rb b/lib/cadence/worker.rb index a3632950..5b1cb23d 100644 --- a/lib/cadence/worker.rb +++ b/lib/cadence/worker.rb @@ -3,6 +3,7 @@ require 'cadence/execution_options' require 'cadence/executable_lookup' require 'cadence/middleware/entry' +require 'cadence/concerns/versioned' module Cadence class Worker @@ -18,7 +19,12 @@ def initialize(config = Cadence.configuration, **options) end def register_workflow(workflow_class, options = {}) - execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) + execution_options = ExecutionOptions.new( + workflow_class, + # ensure default version to avoid executing version picker in worker context + options.merge(version: Cadence::Concerns::Versioned::DEFAULT_VERSION), + config.default_execution_options + ) key = [execution_options.domain, execution_options.task_list] @workflows[key].add(execution_options.name, workflow_class) diff --git a/spec/unit/lib/cadence/concerns/versioned_spec.rb b/spec/unit/lib/cadence/concerns/versioned_spec.rb new file mode 100644 index 00000000..c8e2fd7c --- /dev/null +++ b/spec/unit/lib/cadence/concerns/versioned_spec.rb @@ -0,0 +1,237 @@ +require 'cadence/concerns/versioned' +require 'cadence/workflow/context' + +describe Cadence::Concerns::Versioned do + class TestVersionedWorkflowV1 < Cadence::Workflow + end + + class TestVersionedWorkflowV2 < Cadence::Workflow + domain 'new-domain' + task_list 'new-task-list' + retry_policy interval: 5, backoff: 1, max_attempts: 2 + timeouts execution: 1 + headers 'HeaderV2' => 'TestV2' + end + + class TestVersionedWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + domain 'domain' + task_list 'task-list' + retry_policy interval: 1, backoff: 2, max_attempts: 5 + timeouts start_to_close: 10 + headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' + + version 1, TestVersionedWorkflowV1 + version 2, TestVersionedWorkflowV2 + end + + describe described_class::Workflow do + subject { described_class.new(TestVersionedWorkflow, version) } + + describe '#initialize' do + before { allow(TestVersionedWorkflow).to receive(:pick_version).and_call_original } + + context 'when passed no version header' do + let(:version) { nil } + + it 'initializes the latest version' do + expect(subject.version).to eq(2) + expect(subject.main_class).to eq(TestVersionedWorkflow) + expect(subject.version_class).to eq(TestVersionedWorkflowV2) + end + + it 'calls version picker' do + subject + + expect(TestVersionedWorkflow).to have_received(:pick_version) + end + end + + context 'when passed a specific version header' do + let(:version) { 1 } + + it 'initializes the specified version' do + expect(subject.version).to eq(1) + expect(subject.main_class).to eq(TestVersionedWorkflow) + expect(subject.version_class).to eq(TestVersionedWorkflowV1) + expect(TestVersionedWorkflow).not_to have_received(:pick_version) + end + end + + context 'when passed a non-existing version' do + let(:version) { 3 } + + it 'raises UnknownWorkflowVersion' do + expect { subject }.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for TestVersionedWorkflow' + ) + expect(TestVersionedWorkflow).not_to have_received(:pick_version) + end + end + end + + context 'when version does not override the attributes' do + let(:version) { 1 } + + before { allow(subject).to receive(:warn) } + + describe '#domain' do + it 'returns default version domain' do + expect(subject.domain).to eq('domain') + expect(subject).not_to have_received(:warn) + end + end + + describe '#task_list' do + it 'returns default version task_list' do + expect(subject.task_list).to eq('task-list') + expect(subject).not_to have_received(:warn) + end + end + + describe '#retry_policy' do + it 'returns default version retry_policy' do + expect(subject.retry_policy).to eq(interval: 1, backoff: 2, max_attempts: 5) + end + end + + describe '#timeouts' do + it 'returns default version timeouts' do + expect(subject.timeouts).to eq(start_to_close: 10) + end + end + + describe '#headers' do + it 'returns default version headers including version header' do + expect(subject.headers).to eq( + 'HeaderA' => 'TestA', + 'HeaderB' => 'TestB', + Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '1' + ) + end + end + end + + context 'when version overwrites the attribute' do + let(:version) { 2 } + + before { allow(subject).to receive(:warn) } + + describe '#domain' do + it 'returns default version domain and warns' do + expect(subject.domain).to eq('domain') + expect(subject) + .to have_received(:warn) + .with('[WARNING] Overriding domain in a workflow version is not yet supported. ' \ + 'Called from TestVersionedWorkflowV2.') + end + end + + describe '#task_list' do + it 'returns default version task_list and warns' do + expect(subject.task_list).to eq('task-list') + expect(subject) + .to have_received(:warn) + .with('[WARNING] Overriding task_list in a workflow version is not yet supported. ' \ + 'Called from TestVersionedWorkflowV2.') + end + end + + describe '#retry_policy' do + it 'returns overriden retry_policy' do + expect(subject.retry_policy).to eq(interval: 5, backoff: 1, max_attempts: 2) + end + end + + describe '#timeouts' do + it 'returns overriden timeouts' do + expect(subject.timeouts).to eq(execution: 1) + end + end + + describe '#headers' do + it 'returns overriden headers including version header' do + expect(subject.headers).to eq( + 'HeaderV2' => 'TestV2', + Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2' + ) + end + end + end + end + + describe described_class::ClassMethods do + subject { TestVersionedWorkflow } + + describe '.version' do + after { TestVersionedWorkflow.send(:versions).delete(4) } + + it 'adds a new version' do + subject.version(4, TestVersionedWorkflowV1) + + expect(subject.version_class_for(4)).to eq(TestVersionedWorkflowV1) + end + end + + describe '.execute_in_context' do + let(:context) { instance_double(Cadence::Workflow::Context, headers: headers) } + + context 'when called with a non-default version' do + let(:headers) { { Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2' } } + before { allow(TestVersionedWorkflowV2).to receive(:execute_in_context) } + + it 'calls version' do + subject.execute_in_context(context, nil) + + expect(TestVersionedWorkflowV2).to have_received(:execute_in_context).with(context, nil) + end + end + end + + describe '.version_class_for' do + context 'when given a valid version' do + it 'returns version class' do + expect(subject.version_class_for(2)).to eq(TestVersionedWorkflowV2) + end + end + + context 'when given a default version' do + it 'returns default version class' do + expect(subject.version_class_for(Cadence::Concerns::Versioned::DEFAULT_VERSION)) + .to eq(TestVersionedWorkflow) + end + end + + context 'when given an invalid version' do + it 'raises UnknownWorkflowVersion' do + expect { subject.version_class_for(3) }.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for TestVersionedWorkflow' + ) + end + end + end + + describe '.pick_version' do + class TestPickVersionWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + version_picker { |latest_version| latest_version + 42 } + end + + context 'when using default version picker' do + it 'returns the latest version' do + expect(TestVersionedWorkflow.pick_version).to eq(2) + end + end + + context 'when using overriden version picker' do + it 'returns a custom version' do + expect(TestPickVersionWorkflow.pick_version).to eq(42) + end + end + end + end +end diff --git a/spec/unit/lib/cadence/execution_options_spec.rb b/spec/unit/lib/cadence/execution_options_spec.rb index 46bc4caf..81accfc7 100644 --- a/spec/unit/lib/cadence/execution_options_spec.rb +++ b/spec/unit/lib/cadence/execution_options_spec.rb @@ -199,18 +199,18 @@ class TestWorkflow < Cadence::Workflow end context 'when initialized with a Versioned workflow' do - class TestVersionedWorkflowV1 < Cadence::Workflow + class TestVersionedExecutionOptionsWorkflowV1 < Cadence::Workflow retry_policy interval: 5, backoff: 1, max_attempts: 2 timeouts execution: 1 end - class TestVersionedWorkflowV2 < Cadence::Workflow + class TestVersionedExecutionOptionsWorkflowV2 < Cadence::Workflow domain 'new-domain' task_list 'new-task-list' headers 'HeaderV2' => 'TestV2' end - class TestVersionedWorkflow < Cadence::Workflow + class TestVersionedExecutionOptionsWorkflow < Cadence::Workflow include Cadence::Concerns::Versioned domain 'domain' @@ -219,18 +219,18 @@ class TestVersionedWorkflow < Cadence::Workflow timeouts start_to_close: 10 headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' - version 1, TestVersionedWorkflowV1 - version 2, TestVersionedWorkflowV2 + version 1, TestVersionedExecutionOptionsWorkflowV1 + version 2, TestVersionedExecutionOptionsWorkflowV2 end - let(:object) { TestVersionedWorkflow } + let(:object) { TestVersionedExecutionOptionsWorkflow } let(:options) { {} } context 'when initialized without the version header' do it 'is initialized with a mix of latest version and default version values' do expect(subject.name).to eq(object.name) - expect(subject.domain).to eq('new-domain') - expect(subject.task_list).to eq('new-task-list') + expect(subject.domain).to eq('domain') + expect(subject.task_list).to eq('task-list') expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) expect(subject.retry_policy.interval).to eq(1) expect(subject.retry_policy.backoff).to eq(2) @@ -244,7 +244,7 @@ class TestVersionedWorkflow < Cadence::Workflow end context 'when initialized with the version header' do - let(:options) { { headers: { 'Version' => '1' } } } + let(:options) { { version: 1 } } it 'is initialized with a mix of specified version and default version values' do expect(subject.name).to eq(object.name) @@ -264,7 +264,7 @@ class TestVersionedWorkflow < Cadence::Workflow end context 'when initialized with the default version' do - let(:options) { { headers: { 'Version' => '0' } } } + let(:options) { { version: 0 } } it 'is initialized with a default version values' do expect(subject.name).to eq(object.name) From 7a2c71b219771cd898183204f6bcb2eae56e9869 Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 18 Feb 2022 15:23:34 +0000 Subject: [PATCH 7/7] Update README --- README.md | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4c8e97cc..7aaa1c0a 100644 --- a/README.md +++ b/README.md @@ -397,11 +397,94 @@ however `ActivityNew3` will get executed, since the release wasn't yet checked a every new execution of the workflow — all new activities will get executed, while `ActivityOld` will not. -Later on you can clean it up and drop all the checks if you don't have any older workflows running -or expect them to ever be executed (e.g. reset). - *NOTE: Releases with different names do not depend on each other in any way.* +## Extras + +This section describes optional extra modules included in the SDK for convenience and some +additional functionality. + +### Versioned Workflows + +Implemnting breaking changes using the previously described `#has_release?` flag can be error prone +and results in a condition build up in workflows over time. + +Another way of implementing breaking changes is by doing a full cut over to the new version every +time you need to modify a workflow. This can be achieved manually by treating new versions as +separate workflows. We've simplified this process by making your workflow aware of its versions: + +```ruby +require 'cadence/concerns/versioned' + +class MyWorkflowV1 < Cadence::Workflow + retry_policy max_attempts: 5 + + def execute + Activity2.execute! + end +end + +class MyWorkflowV2 < Cadence::Workflow + timeouts execution: 60 + + def execute + Activity2.execute! + Activity3.execute! + end +end + +class MyWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + version 1, MyWorkflowV1 + version 2, MyWorkflowV2 + + def execute + Activity1.execute! + end +end +``` + +This way you don't need to make any changes to the invocation of your workflows — calling +`Cadence.start_workflow(MyWorkflow)` will resolve the latest version and schedule `MyWorkflowV2`. +It will still appear as if you're executing `MyWorkflow` from the Cadence UI, metrics, logging, etc. +This approach allows you to easily extend your existing workflows without changing anything outside +of your workflow. + +When making a workflow versioned the main class (e.g. `MyWorkflow`) becomes the default version. +Once a workflow was scheduled its version will remain unchanged, so all the previously executed +workflows will be executed using the default implementation. Newly scheduled workflows will pick the +latest available version, but you can specify a version like this: + +```ruby +Cadence.start_workflow(MyWorkflow, options: { version: 1 }) +``` + +Once all the old versions are no longer in use you can remove those files and drop their `version` +definitions (just make sure not to change the numbers for versions that are in active use). + +In case you want to do a gradual rollout you can override the version picker with your own +implementation: + + +```ruby +class MyWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + version 1, MyWorkflowV1 + version 2, MyWorkflowV2 + version_picker do |_latest_version| + if my_feature_flag? + 2 + else + 1 + end + end + + ... +end +``` + ## Testing It is crucial to properly test your workflows and activities before running them in production. The