From 18bf20304e2d3b950ec303f3a9d8491371617eb7 Mon Sep 17 00:00:00 2001 From: Robert Logiewa Date: Fri, 20 Oct 2017 16:07:01 +0200 Subject: [PATCH 1/4] Move cancellation logic into the ConnectAsync() method With cancellation logic in the constructor a reuse of the socket is not possible. By moving it into the ConnectAsync() method the client can connect and disconnect without always creating a new instance of the WebSocketTextClient class. --- WebSocketTextClient/WebSocketTextClient.cs | 50 ++++++++++------------ 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/WebSocketTextClient/WebSocketTextClient.cs b/WebSocketTextClient/WebSocketTextClient.cs index b78a83c..7e8a96d 100644 --- a/WebSocketTextClient/WebSocketTextClient.cs +++ b/WebSocketTextClient/WebSocketTextClient.cs @@ -11,45 +11,27 @@ /// public sealed class WebSocketTextClient : IDisposable { - private readonly CancellationTokenSource tokenSource; private readonly ClientWebSocket socket; - private readonly Task recieveTask; private readonly int initialRecieveBufferSize; private readonly bool autoIncreaseRecieveBuffer; - /// Initializes a new instance of the class. - /// The initial buffer size for incoming messages in bytes. - /// True to double the buffer on overflow. Otherwise an will be thrown. - public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true) - : this(CancellationToken.None, initialRecieveBufferSize, autoIncreaseRecieveBuffer) - { - } + private CancellationTokenSource tokenSource; + private Task recieveTask; /// Initializes a new instance of the class. - /// Cancels pending send and receive operations /// The initial buffer size for incoming messages in bytes. /// True to double the buffer on overflow. Otherwise an will be thrown. - public WebSocketTextClient(CancellationToken cancellationToken, int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true) + public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true) { if (initialRecieveBufferSize <= 0) { throw new ArgumentException("Receive buffer size should be greater than zero", nameof(initialRecieveBufferSize)); } - - var internalTokenSource = new CancellationTokenSource(); - this.tokenSource = cancellationToken != CancellationToken.None - ? CancellationTokenSource.CreateLinkedTokenSource(internalTokenSource.Token, cancellationToken) - : internalTokenSource; - - // Register the disconnect method as a fire and forget method to run when the user requests cancellation - this.tokenSource.Token.Register(() => Task.Run(this.DisconnectAsync)); - - + this.initialRecieveBufferSize = initialRecieveBufferSize; this.autoIncreaseRecieveBuffer = autoIncreaseRecieveBuffer; - socket = new ClientWebSocket(); - recieveTask = this.RecieveLoopAsync(this.tokenSource.Token); + this.socket = new ClientWebSocket(); } /// Signals that response message fully received and ready to process. @@ -66,11 +48,25 @@ public WebSocketTextClient(CancellationToken cancellationToken, int initialRecie /// Asynchronously connects to WebSocket server and start receiving income messages in separate Task. /// The of the WebSocket server to connect to. - public async Task ConnectAsync(Uri url) + /// The token used to close the socket connection. + public async Task ConnectAsync(Uri url, CancellationToken cancellationToken) { - await socket.ConnectAsync(url, this.tokenSource.Token); - recieveTask.Start(); + // Create a new token source, since we can't reuse an existing and cancelled one. + var internalTokenSource = new CancellationTokenSource(); + this.tokenSource = cancellationToken != CancellationToken.None + ? CancellationTokenSource.CreateLinkedTokenSource(internalTokenSource.Token, cancellationToken) + : internalTokenSource; + + // Register the disconnect method as a fire and forget method to run when the user requests cancellation + // Don't pass in the cancellation token from the token source, since the token is cancelling the request. + this.tokenSource.Token.Register(() => Task.Run(this.DisconnectAsync)); + + // Store the receive task with the new cancellation token. + this.recieveTask = this.RecieveLoopAsync(this.tokenSource.Token); + // Open the connection and raise the opened event. + await socket.ConnectAsync(url, this.tokenSource.Token); + this.recieveTask.Start(); this.Opened?.Invoke(this, EventArgs.Empty); } @@ -80,7 +76,7 @@ public async Task DisconnectAsync() await this.socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed the connection", CancellationToken.None); // Check if cancellation is already requested, - // i.e. the user requested cancellation from outside the class. + // e.g. the user requested cancellation from outside the class. if (!this.tokenSource.IsCancellationRequested) { this.tokenSource.Cancel(); From 9409c213d0e938f42087f707324548b2a1cdd4d2 Mon Sep 17 00:00:00 2001 From: Robert Logiewa Date: Tue, 14 Nov 2017 15:07:32 +0100 Subject: [PATCH 2/4] Remove receiveTask.Start() Task.Start() should never be called in the first place. It introduces problems and should be avioded. See: https://stackoverflow.com/a/29693430/1922663 --- WebSocketTextClient/WebSocketTextClient.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/WebSocketTextClient/WebSocketTextClient.cs b/WebSocketTextClient/WebSocketTextClient.cs index 7e8a96d..a30b2e4 100644 --- a/WebSocketTextClient/WebSocketTextClient.cs +++ b/WebSocketTextClient/WebSocketTextClient.cs @@ -12,10 +12,13 @@ public sealed class WebSocketTextClient : IDisposable { private readonly ClientWebSocket socket; + private readonly int initialRecieveBufferSize; + private readonly bool autoIncreaseRecieveBuffer; private CancellationTokenSource tokenSource; + private Task recieveTask; /// Initializes a new instance of the class. @@ -31,9 +34,9 @@ public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreas this.initialRecieveBufferSize = initialRecieveBufferSize; this.autoIncreaseRecieveBuffer = autoIncreaseRecieveBuffer; - this.socket = new ClientWebSocket(); + this.socket = new ClientWebSocket(); } - + /// Signals that response message fully received and ready to process. public event EventHandler MessageReceived; @@ -66,7 +69,7 @@ public async Task ConnectAsync(Uri url, CancellationToken cancellationToken) // Open the connection and raise the opened event. await socket.ConnectAsync(url, this.tokenSource.Token); - this.recieveTask.Start(); + await this.recieveTask; this.Opened?.Invoke(this, EventArgs.Empty); } @@ -115,7 +118,7 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken) WebSocketReceiveResult result; do { - result = await socket.ReceiveAsync(writeSegment, cancellationToken); + result = await socket.ReceiveAsync(writeSegment, CancellationToken.None); writeSegment = new ArraySegment(buffer, writeSegment.Offset + result.Count, writeSegment.Count - result.Count); // check buffer overflow @@ -135,7 +138,6 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken) } while (!result.EndOfMessage); var responce = Encoding.UTF8.GetString(buffer, 0, writeSegment.Offset); - this.MessageReceived?.Invoke(this, new MessageReceivedEventArgs { Message = responce }); } } From f5eb24f159efa2335d4cd7186c3c510355eaf405 Mon Sep 17 00:00:00 2001 From: Robert Logiewa Date: Tue, 14 Nov 2017 15:08:21 +0100 Subject: [PATCH 3/4] Add missing TokenSource The internal token source was never initialized, this has been fixed. --- WebSocketTextClient/WebSocketTextClient.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/WebSocketTextClient/WebSocketTextClient.cs b/WebSocketTextClient/WebSocketTextClient.cs index a30b2e4..52821a5 100644 --- a/WebSocketTextClient/WebSocketTextClient.cs +++ b/WebSocketTextClient/WebSocketTextClient.cs @@ -35,6 +35,7 @@ public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreas this.autoIncreaseRecieveBuffer = autoIncreaseRecieveBuffer; this.socket = new ClientWebSocket(); + this.tokenSource = new CancellationTokenSource(); } /// Signals that response message fully received and ready to process. From 48833615d970b978624a906f67fa4969236b70c8 Mon Sep 17 00:00:00 2001 From: Robert Logiewa Date: Tue, 5 Dec 2017 18:05:54 +0100 Subject: [PATCH 4/4] Make WebSocketClient more robust - Receive task is now started as fire and forget in the constructor - Receive task now only handles messages, when the socket has an open connection - Exposed uncerlying socke object as a public read only property - Exposed a public "Connected" read only property --- WebSocketTextClient/WebSocketTextClient.cs | 49 +++++++++++++--------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/WebSocketTextClient/WebSocketTextClient.cs b/WebSocketTextClient/WebSocketTextClient.cs index 52821a5..2954180 100644 --- a/WebSocketTextClient/WebSocketTextClient.cs +++ b/WebSocketTextClient/WebSocketTextClient.cs @@ -11,15 +11,13 @@ /// public sealed class WebSocketTextClient : IDisposable { - private readonly ClientWebSocket socket; - private readonly int initialRecieveBufferSize; private readonly bool autoIncreaseRecieveBuffer; - private CancellationTokenSource tokenSource; + private readonly Task recieveTask; - private Task recieveTask; + private CancellationTokenSource tokenSource; /// Initializes a new instance of the class. /// The initial buffer size for incoming messages in bytes. @@ -30,14 +28,18 @@ public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreas { throw new ArgumentException("Receive buffer size should be greater than zero", nameof(initialRecieveBufferSize)); } - + this.initialRecieveBufferSize = initialRecieveBufferSize; this.autoIncreaseRecieveBuffer = autoIncreaseRecieveBuffer; - this.socket = new ClientWebSocket(); + this.Socket = new ClientWebSocket(); this.tokenSource = new CancellationTokenSource(); + + // Store the receive task with the a cancellation token. + // This is a fire and forget task that runs completely in the background + this.recieveTask = this.RecieveLoopAsync(this.tokenSource.Token); } - + /// Signals that response message fully received and ready to process. public event EventHandler MessageReceived; @@ -50,6 +52,18 @@ public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreas /// Signals that the socket has opened a connection. public event EventHandler Opened; + /// Gets whether the underlying socket has an open connection. + public bool Connected + { + get + { + return this.Socket.State == WebSocketState.Open; + } + } + + /// Gets the underlying object. + public ClientWebSocket Socket { get; } + /// Asynchronously connects to WebSocket server and start receiving income messages in separate Task. /// The of the WebSocket server to connect to. /// The token used to close the socket connection. @@ -60,24 +74,20 @@ public async Task ConnectAsync(Uri url, CancellationToken cancellationToken) this.tokenSource = cancellationToken != CancellationToken.None ? CancellationTokenSource.CreateLinkedTokenSource(internalTokenSource.Token, cancellationToken) : internalTokenSource; - + // Register the disconnect method as a fire and forget method to run when the user requests cancellation // Don't pass in the cancellation token from the token source, since the token is cancelling the request. this.tokenSource.Token.Register(() => Task.Run(this.DisconnectAsync)); - // Store the receive task with the new cancellation token. - this.recieveTask = this.RecieveLoopAsync(this.tokenSource.Token); - // Open the connection and raise the opened event. - await socket.ConnectAsync(url, this.tokenSource.Token); - await this.recieveTask; + await Socket.ConnectAsync(url, this.tokenSource.Token); this.Opened?.Invoke(this, EventArgs.Empty); } /// Disconnects the WebSocket gracefully from the server. public async Task DisconnectAsync() { - await this.socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed the connection", CancellationToken.None); + await this.Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed the connection", CancellationToken.None); // Check if cancellation is already requested, // e.g. the user requested cancellation from outside the class. @@ -95,7 +105,7 @@ public void AddHeaders(params KeyValuePair[] headers) { foreach (var header in headers) { - this.socket.Options.SetRequestHeader(header.Key, header.Value); + this.Socket.Options.SetRequestHeader(header.Key, header.Value); } } @@ -104,7 +114,7 @@ public void AddHeaders(params KeyValuePair[] headers) public Task SendAsync(string str) { var bytes = Encoding.UTF8.GetBytes(str); - return socket.SendAsync(new ArraySegment(bytes), WebSocketMessageType.Text, true, tokenSource.Token); + return Socket.SendAsync(new ArraySegment(bytes), WebSocketMessageType.Text, true, tokenSource.Token); } private async Task RecieveLoopAsync(CancellationToken cancellationToken) @@ -113,13 +123,14 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken) try { - while (true) + // Only process any messages when the socket has an open connection. + while (this.Connected) { var writeSegment = new ArraySegment(buffer); WebSocketReceiveResult result; do { - result = await socket.ReceiveAsync(writeSegment, CancellationToken.None); + result = await Socket.ReceiveAsync(writeSegment, CancellationToken.None); writeSegment = new ArraySegment(buffer, writeSegment.Offset + result.Count, writeSegment.Count - result.Count); // check buffer overflow @@ -155,7 +166,7 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken) /// public void Dispose() { - socket.Dispose(); + this.Socket.Dispose(); recieveTask.Dispose(); } }