diff --git a/Obvs.Tests/Obvs.Tests.csproj b/Obvs.Tests/Obvs.Tests.csproj index d357150..c83b442 100644 --- a/Obvs.Tests/Obvs.Tests.csproj +++ b/Obvs.Tests/Obvs.Tests.csproj @@ -5,13 +5,14 @@ Copyright © Christopher Read 2014 false + true - - + + - - + + diff --git a/Obvs.Tests/TestMergedMessageSource.cs b/Obvs.Tests/TestMergedMessageSource.cs index e9da4fb..86cec14 100644 --- a/Obvs.Tests/TestMergedMessageSource.cs +++ b/Obvs.Tests/TestMergedMessageSource.cs @@ -1,43 +1,50 @@ using System; +using System.Reactive.Concurrency; using System.Reactive.Linq; + using FakeItEasy; + using Obvs.Types; + using Xunit; -namespace Obvs.Tests -{ - public class TestMergedMessageSource - { +namespace Obvs.Tests { + + public class TestMergedMessageSource { [Fact] - public void ShouldOnlySubscribeToUnderlyingSourcesOnce() - { + public void ShouldOnlySubscribeToUnderlyingSourcesOnce() { IMessageSource source1 = A.Fake>(); IMessageSource source2 = A.Fake>(); IObservable observable1 = A.Fake>(); IObservable observable2 = A.Fake>(); IObserver observer = A.Fake>(); - - A.CallTo(() => source1.Messages).Returns(observable1); - A.CallTo(() => source2.Messages).Returns(observable2); - - MergedMessageSource mergedMessageSource = new MergedMessageSource(new[] { source1, source2 }); - - IDisposable sub1 = mergedMessageSource.Messages.OfType().Subscribe(observer); - IDisposable sub2 = mergedMessageSource.Messages.OfType().Subscribe(observer); - - A.CallTo(() => source1.Messages).MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => observable1.Subscribe(A>._)).MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => source2.Messages).MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => observable2.Subscribe(A>._)).MustHaveHappened(Repeated.Exactly.Once); - + var scheduler = A.Fake(); + + A.CallTo(() => source1.GetMessages(A._)).Returns(observable1); + A.CallTo(() => source2.GetMessages(A._)).Returns(observable2); + + MergedMessageSource mergedMessageSource = new MergedMessageSource(new [] { source1, source2 }); + + var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler); + IDisposable sub1 = mergedMessagesObservable.OfType().Subscribe(observer); + IDisposable sub2 = mergedMessagesObservable.OfType().Subscribe(observer); + + A.CallTo(() => source1.GetMessages(A._)) + .MustHaveHappened(Repeated.Exactly.Once); + A.CallTo(() => observable1.Subscribe(A>._)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => source2.GetMessages(A._)) + .MustHaveHappened(Repeated.Exactly.Once); + A.CallTo(() => observable2.Subscribe(A>._)) + .MustHaveHappened(Repeated.Exactly.Once); + sub1.Dispose(); sub2.Dispose(); } [Fact] - public void ShouldReturnMessagesFromUnderlyingSources() - { + public void ShouldReturnMessagesFromUnderlyingSources() { IMessageSource source1 = A.Fake>(); IMessageSource source2 = A.Fake>(); IObservable observable1 = A.Fake>(); @@ -45,76 +52,80 @@ public void ShouldReturnMessagesFromUnderlyingSources() IObserver observer = A.Fake>(); IObserver internalObserver1 = null; IObserver internalObserver2 = null; + var scheduler = A.Fake(); - A.CallTo(() => source1.Messages).Returns(observable1); - A.CallTo(() => source2.Messages).Returns(observable2); - A.CallTo(() => observable1.Subscribe(A>._)).Invokes(call => internalObserver1 = call.GetArgument>(0)); - A.CallTo(() => observable2.Subscribe(A>._)).Invokes(call => internalObserver2 = call.GetArgument>(0)); + A.CallTo(() => source1.GetMessages(A._)).Returns(observable1); + A.CallTo(() => source2.GetMessages(A._)).Returns(observable2); + A.CallTo(() => observable1.Subscribe(A>._)) + .Invokes(call => internalObserver1 = call.GetArgument>(0)); + A.CallTo(() => observable2.Subscribe(A>._)) + .Invokes(call => internalObserver2 = call.GetArgument>(0)); - MergedMessageSource mergedMessageSource = new MergedMessageSource(new[] { source1, source2 }); + MergedMessageSource mergedMessageSource = new MergedMessageSource(new [] { source1, source2 }); + var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler); + var sub1 = mergedMessagesObservable.OfType().Subscribe(observer); + var sub2 = mergedMessagesObservable.OfType().Subscribe(observer); - IDisposable sub1 = mergedMessageSource.Messages.OfType().Subscribe(observer); - IDisposable sub2 = mergedMessageSource.Messages.OfType().Subscribe(observer); + Assert.NotNull(internalObserver1); + Assert.NotNull(internalObserver2); - Assert.NotNull(internalObserver1); - Assert.NotNull(internalObserver2); + IEvent ev1 = A.Fake(); + IMessage msg2 = A.Fake(); + internalObserver1.OnNext(ev1); + internalObserver2.OnNext(msg2); - IEvent ev1 = A.Fake(); - IMessage msg2 = A.Fake(); - internalObserver1.OnNext(ev1); - internalObserver2.OnNext(msg2); + A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); + A.CallTo(() => observer.OnNext(msg2)).MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); - A.CallTo(() => observer.OnNext(msg2)).MustHaveHappened(Repeated.Exactly.Once); - - sub1.Dispose(); - sub2.Dispose(); - } + sub1.Dispose(); + sub2.Dispose(); + } - [Fact] - public void ShouldDisposeUnderlyingSubscriptionOnlyWhenAllSubscriptionsDisposed() - { - IMessageSource source1 = A.Fake>(); - IMessageSource source2 = A.Fake>(); - IObservable observable1 = A.Fake>(); - IObservable observable2 = A.Fake>(); - IObserver observer = A.Fake>(); - IObserver internalObserver1 = null; - IObserver internalObserver2 = null; + [Fact] + public void ShouldDisposeUnderlyingSubscriptionOnlyWhenAllSubscriptionsDisposed() { + IMessageSource source1 = A.Fake>(); + IMessageSource source2 = A.Fake>(); + IObservable observable1 = A.Fake>(); + IObservable observable2 = A.Fake>(); + IObserver observer = A.Fake>(); + IObserver internalObserver1 = null; + IObserver internalObserver2 = null; + var scheduler = A.Fake(); - A.CallTo(() => source1.Messages).Returns(observable1); - A.CallTo(() => source2.Messages).Returns(observable2); - A.CallTo(() => observable1.Subscribe(A>._)).Invokes(call => internalObserver1 = call.GetArgument>(0)); - A.CallTo(() => observable2.Subscribe(A>._)).Invokes(call => internalObserver2 = call.GetArgument>(0)); + A.CallTo(() => source1.GetMessages(A._)).Returns(observable1); + A.CallTo(() => source2.GetMessages(A._)).Returns(observable2); + A.CallTo(() => observable1.Subscribe(A>._)).Invokes(call => internalObserver1 = call.GetArgument>(0)); + A.CallTo(() => observable2.Subscribe(A>._)).Invokes(call => internalObserver2 = call.GetArgument>(0)); - MergedMessageSource mergedMessageSource = new MergedMessageSource(new[] { source1, source2 }); + MergedMessageSource mergedMessageSource = new MergedMessageSource(new [] { source1, source2 }); - IDisposable sub1 = mergedMessageSource.Messages.OfType().Subscribe(observer); - IDisposable sub2 = mergedMessageSource.Messages.OfType().Subscribe(observer); + var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler); + IDisposable sub1 = mergedMessagesObservable.OfType().Subscribe(observer); + IDisposable sub2 = mergedMessagesObservable.OfType().Subscribe(observer); - Assert.NotNull(internalObserver1); - Assert.NotNull(internalObserver2); + Assert.NotNull(internalObserver1); + Assert.NotNull(internalObserver2); - IEvent ev1 = A.Fake(); - IMessage msg1 = A.Fake(); - IMessage msg2 = A.Fake(); + IEvent ev1 = A.Fake(); + IMessage msg1 = A.Fake(); + IMessage msg2 = A.Fake(); - internalObserver1.OnNext(ev1); - A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); + internalObserver1.OnNext(ev1); + A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice); - // dispose of first subscription - sub1.Dispose(); + // dispose of first subscription + sub1.Dispose(); - // second subscription should still be active - internalObserver2.OnNext(msg1); - A.CallTo(() => observer.OnNext(msg1)).MustHaveHappened(Repeated.Exactly.Once); + // second subscription should still be active + internalObserver2.OnNext(msg1); + A.CallTo(() => observer.OnNext(msg1)).MustHaveHappened(Repeated.Exactly.Once); - // dispose of second subscription - sub2.Dispose(); + // dispose of second subscription + sub2.Dispose(); - // no subscriptions should be active - internalObserver2.OnNext(msg2); - A.CallTo(() => observer.OnNext(msg2)).MustNotHaveHappened(); - } + // no subscriptions should be active + internalObserver2.OnNext(msg2); + A.CallTo(() => observer.OnNext(msg2)).MustNotHaveHappened(); } } +} \ No newline at end of file diff --git a/Obvs.Tests/TestMessageSourceConverter.cs b/Obvs.Tests/TestMessageSourceConverter.cs index 3b881a8..4c7c7bf 100644 --- a/Obvs.Tests/TestMessageSourceConverter.cs +++ b/Obvs.Tests/TestMessageSourceConverter.cs @@ -1,39 +1,47 @@ using System; +using System.Reactive.Concurrency; + using FakeItEasy; + using Obvs.Types; + using Xunit; -namespace Obvs.Tests -{ - - public class TestMessageSourceConverter - { +namespace Obvs.Tests { + + public class TestMessageSourceConverter { public class TestMessage : IMessage { } [Fact] - public void ShouldSubscribeToUnderlyingSourceOnSubscribe() - { + public void ShouldSubscribeToUnderlyingSourceOnSubscribe() { IMessageSource source = A.Fake>(); + IObservable sourceMessagesObservable = A.Fake>(); IMessageSource sourceConverter = new MessageSourceConverter(source, A.Fake>()); + IScheduler scheduler = A.Fake(); IObserver consumer = A.Fake>(); - sourceConverter.Messages.Subscribe(consumer); + A.CallTo(() => source.GetMessages(A._)).Returns(sourceMessagesObservable); + sourceConverter.GetMessages(scheduler).Subscribe(consumer); - A.CallTo(() => source.Messages.Subscribe(A>.Ignored)).MustHaveHappened(Repeated.Exactly.Once); + A.CallTo(() => source.GetMessages(scheduler).Subscribe(A>.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] - public void ShouldConvertAndPublishMessages() - { - IMessageSource source = A.Fake>(); - IMessageConverter converter = A.Fake>(); - IMessageSource sourceConverter = new MessageSourceConverter(source, converter); + public void ShouldConvertAndPublishMessages() { + var source = A.Fake>(); + var sourceMessagesObservable = A.Fake>(); + var converter = A.Fake>(); + var sourceConverter = new MessageSourceConverter(source, converter); IObserver internalObserver = null; - IObserver consumer = A.Fake>(); - TestMessage message = new TestMessage(); - TestMessage convertedMessage = new TestMessage(); + var consumer = A.Fake>(); + var scheduler = A.Fake(); + var message = new TestMessage(); + var convertedMessage = new TestMessage(); - A.CallTo(() => source.Messages.Subscribe(A>.Ignored)).Invokes(call => internalObserver = call.GetArgument>(0)); + A.CallTo(() => source.GetMessages(A._)).Returns(sourceMessagesObservable); + A.CallTo(() => source.GetMessages(scheduler).Subscribe(A>.Ignored)) + .Invokes(call => internalObserver = call.GetArgument>(0)); A.CallTo(() => converter.Convert(message)).Returns(convertedMessage); sourceConverter.Messages.Subscribe(consumer); @@ -45,16 +53,19 @@ public void ShouldConvertAndPublishMessages() } [Fact] - public void ShouldNotPublishInvalidMessages() - { + public void ShouldNotPublishInvalidMessages() { IMessageSource source = A.Fake>(); + IObservable sourceMessagesObservable = A.Fake>(); IMessageConverter converter = A.Fake>(); IMessageSource sourceConverter = new MessageSourceConverter(source, converter); IObserver internalObserver = null; IObserver consumer = A.Fake>(); + IScheduler scheduler = A.Fake(); TestMessage message = new TestMessage(); - - A.CallTo(() => source.Messages.Subscribe(A>.Ignored)).Invokes(call => internalObserver = call.GetArgument>(0)); + + A.CallTo(() => source.GetMessages(A._)).Returns(sourceMessagesObservable); + A.CallTo(() => source.GetMessages(scheduler).Subscribe(A>.Ignored)) + .Invokes(call => internalObserver = call.GetArgument>(0)); A.CallTo(() => converter.Convert(message)).Returns(null); sourceConverter.Messages.Subscribe(consumer); diff --git a/Obvs/IMessageSource.cs b/Obvs/IMessageSource.cs index 9c88e5e..cf5b441 100644 --- a/Obvs/IMessageSource.cs +++ b/Obvs/IMessageSource.cs @@ -1,4 +1,5 @@ using System; +using System.Reactive.Concurrency; using System.Reactive.Linq; namespace Obvs @@ -7,18 +8,36 @@ public interface IMessageSource : IDisposable where TMessage : class { IObservable Messages { get; } + + /// + /// Observe messages from source with a specific scheduler + /// + /// The scheduler implementation to use + /// Observable of messages in the source + IObservable GetMessages(IScheduler scheduler); } - public class DefaultMessageSource : IMessageSource - where TMessage : class + public abstract class BaseMessageSource : IMessageSource where TMessage: class { - public void Dispose() - { + public virtual IObservable Messages { + get => GetMessages(DefaultScheduler.Instance); } - public IObservable Messages + public virtual void Dispose() {} + + public abstract IObservable GetMessages(IScheduler scheduler); + } + + + + public class DefaultMessageSource : BaseMessageSource + where TMessage : class + { + + /// + public override IObservable GetMessages(IScheduler scheduler) { - get { return Observable.Empty(); } + return Observable.Empty(scheduler); } } } \ No newline at end of file diff --git a/Obvs/MergedMessageSource.cs b/Obvs/MergedMessageSource.cs index cdd6c9d..70d69cb 100644 --- a/Obvs/MergedMessageSource.cs +++ b/Obvs/MergedMessageSource.cs @@ -1,31 +1,34 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive.Concurrency; using System.Reactive.Linq; using Obvs.Extensions; namespace Obvs { - public class MergedMessageSource : IMessageSource + public class MergedMessageSource : BaseMessageSource where TMessage : class { - private readonly IObservable _messages; - + private readonly IEnumerable> _sources; + public MergedMessageSource(IEnumerable> sources) { - _messages = sources.Select(source => source.Messages).Merge().PublishRefCountRetriable(); - } - - public IObservable Messages - { - get - { - return _messages; + if (sources == null) { + throw new ArgumentNullException(nameof(sources)); } + if (!sources.Any()) { + throw new ArgumentException("At least one source must be specified", nameof(sources)); + } + _sources = sources; } - public void Dispose() - { + /// + public override IObservable GetMessages(IScheduler scheduler) { + return _sources.Select(source => source.GetMessages(scheduler)) + .Merge() + .PublishRefCountRetriable(); } + } } \ No newline at end of file diff --git a/Obvs/MessageSourceConverter.cs b/Obvs/MessageSourceConverter.cs index 808dbca..41335f2 100644 --- a/Obvs/MessageSourceConverter.cs +++ b/Obvs/MessageSourceConverter.cs @@ -1,9 +1,10 @@ using System; +using System.Reactive.Concurrency; using System.Reactive.Linq; namespace Obvs { - public class MessageSourceConverter : IMessageSource + public class MessageSourceConverter : BaseMessageSource where TTo : class where TFrom : class { @@ -16,14 +17,12 @@ public MessageSourceConverter(IMessageSource source, IMessageConverter Messages - { - get - { - return _source.Messages - .Select(ConvertedMessage) - .Where(MessageIsValid); - } + + /// + public override IObservable GetMessages(IScheduler scheduler) { + return _source.GetMessages(scheduler) + .Select(ConvertedMessage) + .Where(MessageIsValid); } private static bool MessageIsValid(TTo msg) @@ -36,7 +35,7 @@ private TTo ConvertedMessage(TFrom obj) return _converter.Convert(obj); } - public void Dispose() + public override void Dispose() { _source.Dispose(); _converter.Dispose(); diff --git a/Obvs/Obvs.csproj b/Obvs/Obvs.csproj index 66ff5b9..58fa2b0 100644 --- a/Obvs/Obvs.csproj +++ b/Obvs/Obvs.csproj @@ -9,14 +9,16 @@ http://christopherread.github.io/Obvs obvs messaging microservice bus messagebus rx reactive servicebus New csproj format + true An observable microservice bus .NET library, based on Reactive Extensions. Search 'Obvs' for other transport, serialization, and logging extensions. + - + diff --git a/Obvs/SubjectMessageBus.cs b/Obvs/SubjectMessageBus.cs index b36d1ba..1e4bc2a 100644 --- a/Obvs/SubjectMessageBus.cs +++ b/Obvs/SubjectMessageBus.cs @@ -12,6 +12,8 @@ public class SubjectMessageBus : IMessageBus { private readonly ISubject _subject; + private readonly IObservable _messages; + public SubjectMessageBus() : this(null) { @@ -20,10 +22,13 @@ public SubjectMessageBus() public SubjectMessageBus(IScheduler scheduler) { _subject = Subject.Synchronize(new Subject()); + _messages = CreateMessagesObservable(_subject, scheduler); + } - Messages = scheduler == null ? - _subject.AsObservable() : - _subject.ObserveOn(scheduler) + private static IObservable CreateMessagesObservable(ISubject subject, IScheduler scheduler) { + return scheduler == null ? + subject.AsObservable() : + subject.ObserveOn(scheduler) .PublishRefCountRetriable() .AsObservable(); } @@ -36,10 +41,16 @@ public Task PublishAsync(TMessage message) public void Dispose() { - // synchronized subject is annonymous subject underneath, - // which doesn't implment IDisposable + // synchronized subject is anonymous subject underneath, + // which doesn't implement IDisposable } - public IObservable Messages { get; private set; } + public virtual IObservable Messages => _messages; + + /// + public IObservable GetMessages(IScheduler scheduler) + { + return CreateMessagesObservable(_subject, scheduler); + } } } \ No newline at end of file