Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Obvs.Tests/Obvs.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
<Copyright>Copyright © Christopher Read 2014</Copyright>
<Authors />
<IsPackable>false</IsPackable>
<GenerateFullPaths>true</GenerateFullPaths>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="FakeItEasy" Version="4.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" />
<PackageReference Include="FakeItEasy" Version="4.9.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="Microsoft.Reactive.Testing" Version="3.1.1" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Obvs\Obvs.csproj" />
Expand Down
165 changes: 88 additions & 77 deletions Obvs.Tests/TestMergedMessageSource.cs
Original file line number Diff line number Diff line change
@@ -1,120 +1,131 @@
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using FakeItEasy;

using Obvs.Types;

using Xunit;

namespace Obvs.Tests
{
public class TestMergedMessageSource
{
namespace Obvs.Tests {

public class TestMergedMessageSource {
[Fact]
public void ShouldOnlySubscribeToUnderlyingSourcesOnce()
{
public void ShouldOnlySubscribeToUnderlyingSourcesOnce() {
IMessageSource<IEvent> source1 = A.Fake<IMessageSource<IEvent>>();
IMessageSource<IMessage> source2 = A.Fake<IMessageSource<IMessage>>();
IObservable<IEvent> observable1 = A.Fake<IObservable<IEvent>>();
IObservable<IMessage> observable2 = A.Fake<IObservable<IMessage>>();
IObserver<IMessage> observer = A.Fake<IObserver<IMessage>>();

A.CallTo(() => source1.Messages).Returns(observable1);
A.CallTo(() => source2.Messages).Returns(observable2);

MergedMessageSource<IMessage> mergedMessageSource = new MergedMessageSource<IMessage>(new[] { source1, source2 });

IDisposable sub1 = mergedMessageSource.Messages.OfType<IEvent>().Subscribe(observer);
IDisposable sub2 = mergedMessageSource.Messages.OfType<IMessage>().Subscribe(observer);

A.CallTo(() => source1.Messages).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => observable1.Subscribe(A<IObserver<IMessage>>._)).MustHaveHappened(Repeated.Exactly.Once);

A.CallTo(() => source2.Messages).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => observable2.Subscribe(A<IObserver<IMessage>>._)).MustHaveHappened(Repeated.Exactly.Once);

var scheduler = A.Fake<IScheduler>();

A.CallTo(() => source1.GetMessages(A<IScheduler>._)).Returns(observable1);
A.CallTo(() => source2.GetMessages(A<IScheduler>._)).Returns(observable2);

MergedMessageSource<IMessage> mergedMessageSource = new MergedMessageSource<IMessage>(new [] { source1, source2 });

var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler);
IDisposable sub1 = mergedMessagesObservable.OfType<IEvent>().Subscribe(observer);
IDisposable sub2 = mergedMessagesObservable.OfType<IMessage>().Subscribe(observer);

A.CallTo(() => source1.GetMessages(A<IScheduler>._))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => observable1.Subscribe(A<IObserver<IMessage>>._))
.MustHaveHappened(Repeated.Exactly.Once);

A.CallTo(() => source2.GetMessages(A<IScheduler>._))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => observable2.Subscribe(A<IObserver<IMessage>>._))
.MustHaveHappened(Repeated.Exactly.Once);

sub1.Dispose();
sub2.Dispose();
}

[Fact]
public void ShouldReturnMessagesFromUnderlyingSources()
{
public void ShouldReturnMessagesFromUnderlyingSources() {
IMessageSource<IEvent> source1 = A.Fake<IMessageSource<IEvent>>();
IMessageSource<IMessage> source2 = A.Fake<IMessageSource<IMessage>>();
IObservable<IEvent> observable1 = A.Fake<IObservable<IEvent>>();
IObservable<IMessage> observable2 = A.Fake<IObservable<IMessage>>();
IObserver<IMessage> observer = A.Fake<IObserver<IMessage>>();
IObserver<IEvent> internalObserver1 = null;
IObserver<IMessage> internalObserver2 = null;
var scheduler = A.Fake<IScheduler>();

A.CallTo(() => source1.Messages).Returns(observable1);
A.CallTo(() => source2.Messages).Returns(observable2);
A.CallTo(() => observable1.Subscribe(A<IObserver<IEvent>>._)).Invokes(call => internalObserver1 = call.GetArgument<IObserver<IEvent>>(0));
A.CallTo(() => observable2.Subscribe(A<IObserver<IMessage>>._)).Invokes(call => internalObserver2 = call.GetArgument<IObserver<IMessage>>(0));
A.CallTo(() => source1.GetMessages(A<IScheduler>._)).Returns(observable1);
A.CallTo(() => source2.GetMessages(A<IScheduler>._)).Returns(observable2);
A.CallTo(() => observable1.Subscribe(A<IObserver<IEvent>>._))
.Invokes(call => internalObserver1 = call.GetArgument<IObserver<IEvent>>(0));
A.CallTo(() => observable2.Subscribe(A<IObserver<IMessage>>._))
.Invokes(call => internalObserver2 = call.GetArgument<IObserver<IMessage>>(0));

MergedMessageSource<IMessage> mergedMessageSource = new MergedMessageSource<IMessage>(new[] { source1, source2 });
MergedMessageSource<IMessage> mergedMessageSource = new MergedMessageSource<IMessage>(new [] { source1, source2 });
var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler);
var sub1 = mergedMessagesObservable.OfType<IEvent>().Subscribe(observer);
var sub2 = mergedMessagesObservable.OfType<IMessage>().Subscribe(observer);

IDisposable sub1 = mergedMessageSource.Messages.OfType<IEvent>().Subscribe(observer);
IDisposable sub2 = mergedMessageSource.Messages.OfType<IMessage>().Subscribe(observer);
Assert.NotNull(internalObserver1);
Assert.NotNull(internalObserver2);

Assert.NotNull(internalObserver1);
Assert.NotNull(internalObserver2);
IEvent ev1 = A.Fake<IEvent>();
IMessage msg2 = A.Fake<IMessage>();
internalObserver1.OnNext(ev1);
internalObserver2.OnNext(msg2);

IEvent ev1 = A.Fake<IEvent>();
IMessage msg2 = A.Fake<IMessage>();
internalObserver1.OnNext(ev1);
internalObserver2.OnNext(msg2);
A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => observer.OnNext(msg2)).MustHaveHappened(Repeated.Exactly.Once);

A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => observer.OnNext(msg2)).MustHaveHappened(Repeated.Exactly.Once);

sub1.Dispose();
sub2.Dispose();
}
sub1.Dispose();
sub2.Dispose();
}

[Fact]
public void ShouldDisposeUnderlyingSubscriptionOnlyWhenAllSubscriptionsDisposed()
{
IMessageSource<IEvent> source1 = A.Fake<IMessageSource<IEvent>>();
IMessageSource<IMessage> source2 = A.Fake<IMessageSource<IMessage>>();
IObservable<IEvent> observable1 = A.Fake<IObservable<IEvent>>();
IObservable<IMessage> observable2 = A.Fake<IObservable<IMessage>>();
IObserver<IMessage> observer = A.Fake<IObserver<IMessage>>();
IObserver<IEvent> internalObserver1 = null;
IObserver<IMessage> internalObserver2 = null;
[Fact]
public void ShouldDisposeUnderlyingSubscriptionOnlyWhenAllSubscriptionsDisposed() {
IMessageSource<IEvent> source1 = A.Fake<IMessageSource<IEvent>>();
IMessageSource<IMessage> source2 = A.Fake<IMessageSource<IMessage>>();
IObservable<IEvent> observable1 = A.Fake<IObservable<IEvent>>();
IObservable<IMessage> observable2 = A.Fake<IObservable<IMessage>>();
IObserver<IMessage> observer = A.Fake<IObserver<IMessage>>();
IObserver<IEvent> internalObserver1 = null;
IObserver<IMessage> internalObserver2 = null;
var scheduler = A.Fake<IScheduler>();

A.CallTo(() => source1.Messages).Returns(observable1);
A.CallTo(() => source2.Messages).Returns(observable2);
A.CallTo(() => observable1.Subscribe(A<IObserver<IEvent>>._)).Invokes(call => internalObserver1 = call.GetArgument<IObserver<IEvent>>(0));
A.CallTo(() => observable2.Subscribe(A<IObserver<IMessage>>._)).Invokes(call => internalObserver2 = call.GetArgument<IObserver<IMessage>>(0));
A.CallTo(() => source1.GetMessages(A<IScheduler>._)).Returns(observable1);
A.CallTo(() => source2.GetMessages(A<IScheduler>._)).Returns(observable2);
A.CallTo(() => observable1.Subscribe(A<IObserver<IEvent>>._)).Invokes(call => internalObserver1 = call.GetArgument<IObserver<IEvent>>(0));
A.CallTo(() => observable2.Subscribe(A<IObserver<IMessage>>._)).Invokes(call => internalObserver2 = call.GetArgument<IObserver<IMessage>>(0));

MergedMessageSource<IMessage> mergedMessageSource = new MergedMessageSource<IMessage>(new[] { source1, source2 });
MergedMessageSource<IMessage> mergedMessageSource = new MergedMessageSource<IMessage>(new [] { source1, source2 });

IDisposable sub1 = mergedMessageSource.Messages.OfType<IEvent>().Subscribe(observer);
IDisposable sub2 = mergedMessageSource.Messages.OfType<IMessage>().Subscribe(observer);
var mergedMessagesObservable = mergedMessageSource.GetMessages(scheduler);
IDisposable sub1 = mergedMessagesObservable.OfType<IEvent>().Subscribe(observer);
IDisposable sub2 = mergedMessagesObservable.OfType<IMessage>().Subscribe(observer);

Assert.NotNull(internalObserver1);
Assert.NotNull(internalObserver2);
Assert.NotNull(internalObserver1);
Assert.NotNull(internalObserver2);

IEvent ev1 = A.Fake<IEvent>();
IMessage msg1 = A.Fake<IMessage>();
IMessage msg2 = A.Fake<IMessage>();
IEvent ev1 = A.Fake<IEvent>();
IMessage msg1 = A.Fake<IMessage>();
IMessage msg2 = A.Fake<IMessage>();

internalObserver1.OnNext(ev1);
A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice);
internalObserver1.OnNext(ev1);
A.CallTo(() => observer.OnNext(ev1)).MustHaveHappened(Repeated.Exactly.Twice);

// dispose of first subscription
sub1.Dispose();
// dispose of first subscription
sub1.Dispose();

// second subscription should still be active
internalObserver2.OnNext(msg1);
A.CallTo(() => observer.OnNext(msg1)).MustHaveHappened(Repeated.Exactly.Once);
// second subscription should still be active
internalObserver2.OnNext(msg1);
A.CallTo(() => observer.OnNext(msg1)).MustHaveHappened(Repeated.Exactly.Once);

// dispose of second subscription
sub2.Dispose();
// dispose of second subscription
sub2.Dispose();

// no subscriptions should be active
internalObserver2.OnNext(msg2);
A.CallTo(() => observer.OnNext(msg2)).MustNotHaveHappened();
}
// no subscriptions should be active
internalObserver2.OnNext(msg2);
A.CallTo(() => observer.OnNext(msg2)).MustNotHaveHappened();
}
}
}
55 changes: 33 additions & 22 deletions Obvs.Tests/TestMessageSourceConverter.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
using System;
using System.Reactive.Concurrency;

using FakeItEasy;

using Obvs.Types;

using Xunit;

namespace Obvs.Tests
{

public class TestMessageSourceConverter
{
namespace Obvs.Tests {

public class TestMessageSourceConverter {
public class TestMessage : IMessage { }

[Fact]
public void ShouldSubscribeToUnderlyingSourceOnSubscribe()
{
public void ShouldSubscribeToUnderlyingSourceOnSubscribe() {
IMessageSource<TestMessage> source = A.Fake<IMessageSource<TestMessage>>();
IObservable<TestMessage> sourceMessagesObservable = A.Fake<IObservable<TestMessage>>();
IMessageSource<TestMessage> sourceConverter = new MessageSourceConverter<TestMessage, TestMessage>(source, A.Fake<IMessageConverter<TestMessage, TestMessage>>());
IScheduler scheduler = A.Fake<IScheduler>();
IObserver<TestMessage> consumer = A.Fake<IObserver<TestMessage>>();

sourceConverter.Messages.Subscribe(consumer);
A.CallTo(() => source.GetMessages(A<IScheduler>._)).Returns(sourceMessagesObservable);
sourceConverter.GetMessages(scheduler).Subscribe(consumer);

A.CallTo(() => source.Messages.Subscribe(A<IObserver<TestMessage>>.Ignored)).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => source.GetMessages(scheduler).Subscribe(A<IObserver<TestMessage>>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}

[Fact]
public void ShouldConvertAndPublishMessages()
{
IMessageSource<TestMessage> source = A.Fake<IMessageSource<TestMessage>>();
IMessageConverter<TestMessage, TestMessage> converter = A.Fake<IMessageConverter<TestMessage, TestMessage>>();
IMessageSource<TestMessage> sourceConverter = new MessageSourceConverter<TestMessage, TestMessage>(source, converter);
public void ShouldConvertAndPublishMessages() {
var source = A.Fake<IMessageSource<TestMessage>>();
var sourceMessagesObservable = A.Fake<IObservable<TestMessage>>();
var converter = A.Fake<IMessageConverter<TestMessage, TestMessage>>();
var sourceConverter = new MessageSourceConverter<TestMessage, TestMessage>(source, converter);
IObserver<TestMessage> internalObserver = null;
IObserver<TestMessage> consumer = A.Fake<IObserver<TestMessage>>();
TestMessage message = new TestMessage();
TestMessage convertedMessage = new TestMessage();
var consumer = A.Fake<IObserver<TestMessage>>();
var scheduler = A.Fake<IScheduler>();
var message = new TestMessage();
var convertedMessage = new TestMessage();

A.CallTo(() => source.Messages.Subscribe(A<IObserver<TestMessage>>.Ignored)).Invokes(call => internalObserver = call.GetArgument<IObserver<TestMessage>>(0));
A.CallTo(() => source.GetMessages(A<IScheduler>._)).Returns(sourceMessagesObservable);
A.CallTo(() => source.GetMessages(scheduler).Subscribe(A<IObserver<TestMessage>>.Ignored))
.Invokes(call => internalObserver = call.GetArgument<IObserver<TestMessage>>(0));
A.CallTo(() => converter.Convert(message)).Returns(convertedMessage);

sourceConverter.Messages.Subscribe(consumer);
Expand All @@ -45,16 +53,19 @@ public void ShouldConvertAndPublishMessages()
}

[Fact]
public void ShouldNotPublishInvalidMessages()
{
public void ShouldNotPublishInvalidMessages() {
IMessageSource<TestMessage> source = A.Fake<IMessageSource<TestMessage>>();
IObservable<TestMessage> sourceMessagesObservable = A.Fake<IObservable<TestMessage>>();
IMessageConverter<TestMessage, TestMessage> converter = A.Fake<IMessageConverter<TestMessage, TestMessage>>();
IMessageSource<TestMessage> sourceConverter = new MessageSourceConverter<TestMessage, TestMessage>(source, converter);
IObserver<TestMessage> internalObserver = null;
IObserver<TestMessage> consumer = A.Fake<IObserver<TestMessage>>();
IScheduler scheduler = A.Fake<IScheduler>();
TestMessage message = new TestMessage();

A.CallTo(() => source.Messages.Subscribe(A<IObserver<TestMessage>>.Ignored)).Invokes(call => internalObserver = call.GetArgument<IObserver<TestMessage>>(0));

A.CallTo(() => source.GetMessages(A<IScheduler>._)).Returns(sourceMessagesObservable);
A.CallTo(() => source.GetMessages(scheduler).Subscribe(A<IObserver<TestMessage>>.Ignored))
.Invokes(call => internalObserver = call.GetArgument<IObserver<TestMessage>>(0));
A.CallTo(() => converter.Convert(message)).Returns(null);

sourceConverter.Messages.Subscribe(consumer);
Expand Down
31 changes: 25 additions & 6 deletions Obvs/IMessageSource.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace Obvs
Expand All @@ -7,18 +8,36 @@ public interface IMessageSource<out TMessage> : IDisposable
where TMessage : class
{
IObservable<TMessage> Messages { get; }

/// <summary>
/// Observe messages from source with a specific scheduler
/// </summary>
/// <param name="scheduler">The scheduler implementation to use</param>
/// <returns>Observable of messages in the source</returns>
IObservable<TMessage> GetMessages(IScheduler scheduler);
}

public class DefaultMessageSource<TMessage> : IMessageSource<TMessage>
where TMessage : class
public abstract class BaseMessageSource<TMessage> : IMessageSource<TMessage> where TMessage: class
{
public void Dispose()
{
public virtual IObservable<TMessage> Messages {
get => GetMessages(DefaultScheduler.Instance);
}

public IObservable<TMessage> Messages
public virtual void Dispose() {}

public abstract IObservable<TMessage> GetMessages(IScheduler scheduler);
}



public class DefaultMessageSource<TMessage> : BaseMessageSource<TMessage>
where TMessage : class
{

/// <inheritdoc />
public override IObservable<TMessage> GetMessages(IScheduler scheduler)
{
get { return Observable.Empty<TMessage>(); }
return Observable.Empty<TMessage>(scheduler);
}
}
}
Loading