From d83be7a578059b6bf8b629c4cf92e08a3e9da3a3 Mon Sep 17 00:00:00 2001 From: Fernando Almeida Date: Mon, 31 Dec 2018 17:04:04 +0000 Subject: [PATCH 1/2] Augmented IMessageSource in a backward compatible manner to support the specification of an external IScheduler to tweak concurrent execution and enable richer testing using TestScheduler. Upgraded all possible package dependencies without breaking support for currently targetted frameworks. Revised test definitions to accommodate the change in IMessageSource. --- Obvs.Tests/Obvs.Tests.csproj | 9 +- Obvs.Tests/TestMergedMessageSource.cs | 165 ++++++++++++----------- Obvs.Tests/TestMessageSourceConverter.cs | 55 +++++--- Obvs/IMessageSource.cs | 31 ++++- Obvs/MergedMessageSource.cs | 29 ++-- Obvs/MessageSourceConverter.cs | 19 ++- Obvs/Obvs.csproj | 4 +- Obvs/SubjectMessageBus.cs | 23 +++- 8 files changed, 196 insertions(+), 139 deletions(-) diff --git a/Obvs.Tests/Obvs.Tests.csproj b/Obvs.Tests/Obvs.Tests.csproj index d357150..c83b442 100644 --- a/Obvs.Tests/Obvs.Tests.csproj +++ b/Obvs.Tests/Obvs.Tests.csproj @@ -5,13 +5,14 @@ Copyright © Christopher Read 2014 false + true - - + + - - + + diff --git a/Obvs.Tests/TestMergedMessageSource.cs b/Obvs.Tests/TestMergedMessageSource.cs index e9da4fb..86cec14 100644 --- a/Obvs.Tests/TestMergedMessageSource.cs +++ b/Obvs.Tests/TestMergedMessageSource.cs @@ -1,43 +1,50 @@ using System; +using System.Reactive.Concurrency; using System.Reactive.Linq; + using FakeItEasy; + using Obvs.Types; + using Xunit; -namespace Obvs.Tests -{ - public class TestMergedMessageSource - { +namespace Obvs.Tests { + + public class TestMergedMessageSource { [Fact] - public void ShouldOnlySubscribeToUnderlyingSourcesOnce() - { + public void ShouldOnlySubscribeToUnderlyingSourcesOnce() { IMessageSource source1 = A.Fake>(); IMessageSource source2 = A.Fake>(); IObservable observable1 = A.Fake>(); IObservable observable2 = A.Fake>(); IObserver observer = A.Fake>(); - - A.CallTo(() => source1.Messages).Returns(observable1); - A.CallTo(() => source2.Messages).Returns(observable2); - - MergedMessageSource mergedMessageSource = new MergedMessageSource(new[] { source1, source2 }); - - IDisposable sub1 = mergedMessageSource.Messages.OfType().Subscribe(observer); - IDisposable sub2 = mergedMessageSource.Messages.OfType().Subscribe(observer); - - A.CallTo(() => source1.Messages).MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => observable1.Subscribe(A>._)).MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => source2.Messages).MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => observable2.Subscribe(A>._)).MustHaveHappened(Repeated.Exactly.Once); - + var scheduler = A.Fake(); + + A.CallTo(() => source1.GetMessages(A._)).Returns(observable1); + A.CallTo(() => source2.GetMessages(A._)).Returns(observable2); + + MergedMessageSource mergedMessageSource = new MergedMessageSource(new [] { source1, source2 }); + + var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler); + IDisposable sub1 = mergedMessagesObservable.OfType().Subscribe(observer); + IDisposable sub2 = mergedMessagesObservable.OfType().Subscribe(observer); + + A.CallTo(() => source1.GetMessages(A._)) + .MustHaveHappened(Repeated.Exactly.Once); + A.CallTo(() => observable1.Subscribe(A>._)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => source2.GetMessages(A._)) + .MustHaveHappened(Repeated.Exactly.Once); + A.CallTo(() => observable2.Subscribe(A>._)) + .MustHaveHappened(Repeated.Exactly.Once); + sub1.Dispose(); sub2.Dispose(); } [Fact] - public void ShouldReturnMessagesFromUnderlyingSources() - { + public void ShouldReturnMessagesFromUnderlyingSources() { IMessageSource source1 = A.Fake>(); IMessageSource source2 = A.Fake>(); IObservable observable1 = A.Fake>(); @@ -45,76 +52,80 @@ public void ShouldReturnMessagesFromUnderlyingSources() IObserver observer = A.Fake>(); IObserver internalObserver1 = null; IObserver internalObserver2 = null; + var scheduler = A.Fake(); - A.CallTo(() => source1.Messages).Returns(observable1); - A.CallTo(() => source2.Messages).Returns(observable2); - A.CallTo(() => observable1.Subscribe(A>._)).Invokes(call => internalObserver1 = call.GetArgument>(0)); - A.CallTo(() => observable2.Subscribe(A>._)).Invokes(call => internalObserver2 = call.GetArgument>(0)); + A.CallTo(() => source1.GetMessages(A._)).Returns(observable1); + A.CallTo(() => source2.GetMessages(A._)).Returns(observable2); + A.CallTo(() => observable1.Subscribe(A>._)) + .Invokes(call => internalObserver1 = call.GetArgument>(0)); + A.CallTo(() => observable2.Subscribe(A>._)) + .Invokes(call => internalObserver2 = call.GetArgument>(0)); - MergedMessageSource mergedMessageSource = new MergedMessageSource(new[] { source1, source2 }); + MergedMessageSource mergedMessageSource = new MergedMessageSource(new [] { source1, source2 }); + var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler); + var sub1 = mergedMessagesObservable.OfType().Subscribe(observer); + var sub2 = mergedMessagesObservable.OfType().Subscribe(observer); - IDisposable sub1 = mergedMessageSource.Messages.OfType().Subscribe(observer); - IDisposable sub2 = mergedMessageSource.Messages.OfType().Subscribe(observer); + Assert.NotNull(internalObserver1); + Assert.NotNull(internalObserver2); - Assert.NotNull(internalObserver1); - Assert.NotNull(internalObserver2); + IEvent ev1 = A.Fake(); + IMessage msg2 = A.Fake(); + internalObserver1.OnNext(ev1); + internalObserver2.OnNext(msg2); - IEvent ev1 = A.Fake(); - IMessage msg2 = A.Fake(); - internalObserver1.OnNext(ev1); - internalObserver2.OnNext(msg2); + A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); + A.CallTo(() => observer.OnNext(msg2)).MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); - A.CallTo(() => observer.OnNext(msg2)).MustHaveHappened(Repeated.Exactly.Once); - - sub1.Dispose(); - sub2.Dispose(); - } + sub1.Dispose(); + sub2.Dispose(); + } - [Fact] - public void ShouldDisposeUnderlyingSubscriptionOnlyWhenAllSubscriptionsDisposed() - { - IMessageSource source1 = A.Fake>(); - IMessageSource source2 = A.Fake>(); - IObservable observable1 = A.Fake>(); - IObservable observable2 = A.Fake>(); - IObserver observer = A.Fake>(); - IObserver internalObserver1 = null; - IObserver internalObserver2 = null; + [Fact] + public void ShouldDisposeUnderlyingSubscriptionOnlyWhenAllSubscriptionsDisposed() { + IMessageSource source1 = A.Fake>(); + IMessageSource source2 = A.Fake>(); + IObservable observable1 = A.Fake>(); + IObservable observable2 = A.Fake>(); + IObserver observer = A.Fake>(); + IObserver internalObserver1 = null; + IObserver internalObserver2 = null; + var scheduler = A.Fake(); - A.CallTo(() => source1.Messages).Returns(observable1); - A.CallTo(() => source2.Messages).Returns(observable2); - A.CallTo(() => observable1.Subscribe(A>._)).Invokes(call => internalObserver1 = call.GetArgument>(0)); - A.CallTo(() => observable2.Subscribe(A>._)).Invokes(call => internalObserver2 = call.GetArgument>(0)); + A.CallTo(() => source1.GetMessages(A._)).Returns(observable1); + A.CallTo(() => source2.GetMessages(A._)).Returns(observable2); + A.CallTo(() => observable1.Subscribe(A>._)).Invokes(call => internalObserver1 = call.GetArgument>(0)); + A.CallTo(() => observable2.Subscribe(A>._)).Invokes(call => internalObserver2 = call.GetArgument>(0)); - MergedMessageSource mergedMessageSource = new MergedMessageSource(new[] { source1, source2 }); + MergedMessageSource mergedMessageSource = new MergedMessageSource(new [] { source1, source2 }); - IDisposable sub1 = mergedMessageSource.Messages.OfType().Subscribe(observer); - IDisposable sub2 = mergedMessageSource.Messages.OfType().Subscribe(observer); + var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler); + IDisposable sub1 = mergedMessagesObservable.OfType().Subscribe(observer); + IDisposable sub2 = mergedMessagesObservable.OfType().Subscribe(observer); - Assert.NotNull(internalObserver1); - Assert.NotNull(internalObserver2); + Assert.NotNull(internalObserver1); + Assert.NotNull(internalObserver2); - IEvent ev1 = A.Fake(); - IMessage msg1 = A.Fake(); - IMessage msg2 = A.Fake(); + IEvent ev1 = A.Fake(); + IMessage msg1 = A.Fake(); + IMessage msg2 = A.Fake(); - internalObserver1.OnNext(ev1); - A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); + internalObserver1.OnNext(ev1); + A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); - // dispose of first subscription - sub1.Dispose(); + // dispose of first subscription + sub1.Dispose(); - // second subscription should still be active - internalObserver2.OnNext(msg1); - A.CallTo(() => observer.OnNext(msg1)).MustHaveHappened(Repeated.Exactly.Once); + // second subscription should still be active + internalObserver2.OnNext(msg1); + A.CallTo(() => observer.OnNext(msg1)).MustHaveHappened(Repeated.Exactly.Once); - // dispose of second subscription - sub2.Dispose(); + // dispose of second subscription + sub2.Dispose(); - // no subscriptions should be active - internalObserver2.OnNext(msg2); - A.CallTo(() => observer.OnNext(msg2)).MustNotHaveHappened(); - } + // no subscriptions should be active + internalObserver2.OnNext(msg2); + A.CallTo(() => observer.OnNext(msg2)).MustNotHaveHappened(); } } +} \ No newline at end of file diff --git a/Obvs.Tests/TestMessageSourceConverter.cs b/Obvs.Tests/TestMessageSourceConverter.cs index 3b881a8..4c7c7bf 100644 --- a/Obvs.Tests/TestMessageSourceConverter.cs +++ b/Obvs.Tests/TestMessageSourceConverter.cs @@ -1,39 +1,47 @@ using System; +using System.Reactive.Concurrency; + using FakeItEasy; + using Obvs.Types; + using Xunit; -namespace Obvs.Tests -{ - - public class TestMessageSourceConverter - { +namespace Obvs.Tests { + + public class TestMessageSourceConverter { public class TestMessage : IMessage { } [Fact] - public void ShouldSubscribeToUnderlyingSourceOnSubscribe() - { + public void ShouldSubscribeToUnderlyingSourceOnSubscribe() { IMessageSource source = A.Fake>(); + IObservable sourceMessagesObservable = A.Fake>(); IMessageSource sourceConverter = new MessageSourceConverter(source, A.Fake>()); + IScheduler scheduler = A.Fake(); IObserver consumer = A.Fake>(); - sourceConverter.Messages.Subscribe(consumer); + A.CallTo(() => source.GetMessages(A._)).Returns(sourceMessagesObservable); + sourceConverter.GetMessages(scheduler).Subscribe(consumer); - A.CallTo(() => source.Messages.Subscribe(A>.Ignored)).MustHaveHappened(Repeated.Exactly.Once); + A.CallTo(() => source.GetMessages(scheduler).Subscribe(A>.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] - public void ShouldConvertAndPublishMessages() - { - IMessageSource source = A.Fake>(); - IMessageConverter converter = A.Fake>(); - IMessageSource sourceConverter = new MessageSourceConverter(source, converter); + public void ShouldConvertAndPublishMessages() { + var source = A.Fake>(); + var sourceMessagesObservable = A.Fake>(); + var converter = A.Fake>(); + var sourceConverter = new MessageSourceConverter(source, converter); IObserver internalObserver = null; - IObserver consumer = A.Fake>(); - TestMessage message = new TestMessage(); - TestMessage convertedMessage = new TestMessage(); + var consumer = A.Fake>(); + var scheduler = A.Fake(); + var message = new TestMessage(); + var convertedMessage = new TestMessage(); - A.CallTo(() => source.Messages.Subscribe(A>.Ignored)).Invokes(call => internalObserver = call.GetArgument>(0)); + A.CallTo(() => source.GetMessages(A._)).Returns(sourceMessagesObservable); + A.CallTo(() => source.GetMessages(scheduler).Subscribe(A>.Ignored)) + .Invokes(call => internalObserver = call.GetArgument>(0)); A.CallTo(() => converter.Convert(message)).Returns(convertedMessage); sourceConverter.Messages.Subscribe(consumer); @@ -45,16 +53,19 @@ public void ShouldConvertAndPublishMessages() } [Fact] - public void ShouldNotPublishInvalidMessages() - { + public void ShouldNotPublishInvalidMessages() { IMessageSource source = A.Fake>(); + IObservable sourceMessagesObservable = A.Fake>(); IMessageConverter converter = A.Fake>(); IMessageSource sourceConverter = new MessageSourceConverter(source, converter); IObserver internalObserver = null; IObserver consumer = A.Fake>(); + IScheduler scheduler = A.Fake(); TestMessage message = new TestMessage(); - - A.CallTo(() => source.Messages.Subscribe(A>.Ignored)).Invokes(call => internalObserver = call.GetArgument>(0)); + + A.CallTo(() => source.GetMessages(A._)).Returns(sourceMessagesObservable); + A.CallTo(() => source.GetMessages(scheduler).Subscribe(A>.Ignored)) + .Invokes(call => internalObserver = call.GetArgument>(0)); A.CallTo(() => converter.Convert(message)).Returns(null); sourceConverter.Messages.Subscribe(consumer); diff --git a/Obvs/IMessageSource.cs b/Obvs/IMessageSource.cs index 9c88e5e..cf5b441 100644 --- a/Obvs/IMessageSource.cs +++ b/Obvs/IMessageSource.cs @@ -1,4 +1,5 @@ using System; +using System.Reactive.Concurrency; using System.Reactive.Linq; namespace Obvs @@ -7,18 +8,36 @@ public interface IMessageSource : IDisposable where TMessage : class { IObservable Messages { get; } + + /// + /// Observe messages from source with a specific scheduler + /// + /// The scheduler implementation to use + /// Observable of messages in the source + IObservable GetMessages(IScheduler scheduler); } - public class DefaultMessageSource : IMessageSource - where TMessage : class + public abstract class BaseMessageSource : IMessageSource where TMessage: class { - public void Dispose() - { + public virtual IObservable Messages { + get => GetMessages(DefaultScheduler.Instance); } - public IObservable Messages + public virtual void Dispose() {} + + public abstract IObservable GetMessages(IScheduler scheduler); + } + + + + public class DefaultMessageSource : BaseMessageSource + where TMessage : class + { + + /// + public override IObservable GetMessages(IScheduler scheduler) { - get { return Observable.Empty(); } + return Observable.Empty(scheduler); } } } \ No newline at end of file diff --git a/Obvs/MergedMessageSource.cs b/Obvs/MergedMessageSource.cs index cdd6c9d..70d69cb 100644 --- a/Obvs/MergedMessageSource.cs +++ b/Obvs/MergedMessageSource.cs @@ -1,31 +1,34 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive.Concurrency; using System.Reactive.Linq; using Obvs.Extensions; namespace Obvs { - public class MergedMessageSource : IMessageSource + public class MergedMessageSource : BaseMessageSource where TMessage : class { - private readonly IObservable _messages; - + private readonly IEnumerable> _sources; + public MergedMessageSource(IEnumerable> sources) { - _messages = sources.Select(source => source.Messages).Merge().PublishRefCountRetriable(); - } - - public IObservable Messages - { - get - { - return _messages; + if (sources == null) { + throw new ArgumentNullException(nameof(sources)); } + if (!sources.Any()) { + throw new ArgumentException("At least one source must be specified", nameof(sources)); + } + _sources = sources; } - public void Dispose() - { + /// + public override IObservable GetMessages(IScheduler scheduler) { + return _sources.Select(source => source.GetMessages(scheduler)) + .Merge() + .PublishRefCountRetriable(); } + } } \ No newline at end of file diff --git a/Obvs/MessageSourceConverter.cs b/Obvs/MessageSourceConverter.cs index 808dbca..41335f2 100644 --- a/Obvs/MessageSourceConverter.cs +++ b/Obvs/MessageSourceConverter.cs @@ -1,9 +1,10 @@ using System; +using System.Reactive.Concurrency; using System.Reactive.Linq; namespace Obvs { - public class MessageSourceConverter : IMessageSource + public class MessageSourceConverter : BaseMessageSource where TTo : class where TFrom : class { @@ -16,14 +17,12 @@ public MessageSourceConverter(IMessageSource source, IMessageConverter Messages - { - get - { - return _source.Messages - .Select(ConvertedMessage) - .Where(MessageIsValid); - } + + /// + public override IObservable GetMessages(IScheduler scheduler) { + return _source.GetMessages(scheduler) + .Select(ConvertedMessage) + .Where(MessageIsValid); } private static bool MessageIsValid(TTo msg) @@ -36,7 +35,7 @@ private TTo ConvertedMessage(TFrom obj) return _converter.Convert(obj); } - public void Dispose() + public override void Dispose() { _source.Dispose(); _converter.Dispose(); diff --git a/Obvs/Obvs.csproj b/Obvs/Obvs.csproj index 66ff5b9..58fa2b0 100644 --- a/Obvs/Obvs.csproj +++ b/Obvs/Obvs.csproj @@ -9,14 +9,16 @@ http://christopherread.github.io/Obvs obvs messaging microservice bus messagebus rx reactive servicebus New csproj format + true An observable microservice bus .NET library, based on Reactive Extensions. Search 'Obvs' for other transport, serialization, and logging extensions. + - + diff --git a/Obvs/SubjectMessageBus.cs b/Obvs/SubjectMessageBus.cs index b36d1ba..bc0d04f 100644 --- a/Obvs/SubjectMessageBus.cs +++ b/Obvs/SubjectMessageBus.cs @@ -12,6 +12,8 @@ public class SubjectMessageBus : IMessageBus { private readonly ISubject _subject; + private readonly IObservable _messages; + public SubjectMessageBus() : this(null) { @@ -20,10 +22,13 @@ public SubjectMessageBus() public SubjectMessageBus(IScheduler scheduler) { _subject = Subject.Synchronize(new Subject()); + _messages = CreateMessagesObservable(_subject, scheduler); + } - Messages = scheduler == null ? - _subject.AsObservable() : - _subject.ObserveOn(scheduler) + private static IObservable CreateMessagesObservable(ISubject subject, IScheduler scheduler) { + return scheduler == null ? + subject.AsObservable() : + subject.ObserveOn(scheduler) .PublishRefCountRetriable() .AsObservable(); } @@ -36,10 +41,16 @@ public Task PublishAsync(TMessage message) public void Dispose() { - // synchronized subject is annonymous subject underneath, - // which doesn't implment IDisposable + // synchronized subject is anonymous subject underneath, + // which doesn't implement IDisposable } - public IObservable Messages { get; private set; } + public virtual IObservable Messages => _messages; + + public IObservable GetMessages(IScheduler scheduler) + { + var subject = Subject.Synchronize(new Subject()); + return CreateMessagesObservable(subject, scheduler); + } } } \ No newline at end of file From a3e64db7563c04c9fa6b0de3cec5f0b7c737577f Mon Sep 17 00:00:00 2001 From: Fernando Almeida Date: Wed, 2 Jan 2019 00:02:27 +0000 Subject: [PATCH 2/2] Revised SubjectMessageBus GetMessages(IScheduler) to reuse the same subject that's got was create upon class instantiation --- Obvs/SubjectMessageBus.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Obvs/SubjectMessageBus.cs b/Obvs/SubjectMessageBus.cs index bc0d04f..1e4bc2a 100644 --- a/Obvs/SubjectMessageBus.cs +++ b/Obvs/SubjectMessageBus.cs @@ -47,10 +47,10 @@ public void Dispose() public virtual IObservable Messages => _messages; + /// public IObservable GetMessages(IScheduler scheduler) { - var subject = Subject.Synchronize(new Subject()); - return CreateMessagesObservable(subject, scheduler); + return CreateMessagesObservable(_subject, scheduler); } } } \ No newline at end of file