diff --git a/WebSocketTextClient/WebSocketTextClient.cs b/WebSocketTextClient/WebSocketTextClient.cs index b78a83c..2954180 100644 --- a/WebSocketTextClient/WebSocketTextClient.cs +++ b/WebSocketTextClient/WebSocketTextClient.cs @@ -11,45 +11,33 @@ /// public sealed class WebSocketTextClient : IDisposable { - private readonly CancellationTokenSource tokenSource; - private readonly ClientWebSocket socket; - private readonly Task recieveTask; private readonly int initialRecieveBufferSize; + private readonly bool autoIncreaseRecieveBuffer; - /// Initializes a new instance of the class. - /// The initial buffer size for incoming messages in bytes. - /// True to double the buffer on overflow. Otherwise an will be thrown. - public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true) - : this(CancellationToken.None, initialRecieveBufferSize, autoIncreaseRecieveBuffer) - { - } + private readonly Task recieveTask; + + private CancellationTokenSource tokenSource; /// Initializes a new instance of the class. - /// Cancels pending send and receive operations /// The initial buffer size for incoming messages in bytes. /// True to double the buffer on overflow. Otherwise an will be thrown. - public WebSocketTextClient(CancellationToken cancellationToken, int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true) + public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true) { if (initialRecieveBufferSize <= 0) { throw new ArgumentException("Receive buffer size should be greater than zero", nameof(initialRecieveBufferSize)); } - var internalTokenSource = new CancellationTokenSource(); - this.tokenSource = cancellationToken != CancellationToken.None - ? CancellationTokenSource.CreateLinkedTokenSource(internalTokenSource.Token, cancellationToken) - : internalTokenSource; - - // Register the disconnect method as a fire and forget method to run when the user requests cancellation - this.tokenSource.Token.Register(() => Task.Run(this.DisconnectAsync)); - - this.initialRecieveBufferSize = initialRecieveBufferSize; this.autoIncreaseRecieveBuffer = autoIncreaseRecieveBuffer; - socket = new ClientWebSocket(); - recieveTask = this.RecieveLoopAsync(this.tokenSource.Token); + this.Socket = new ClientWebSocket(); + this.tokenSource = new CancellationTokenSource(); + + // Store the receive task with the a cancellation token. + // This is a fire and forget task that runs completely in the background + this.recieveTask = this.RecieveLoopAsync(this.tokenSource.Token); } /// Signals that response message fully received and ready to process. @@ -64,23 +52,45 @@ public WebSocketTextClient(CancellationToken cancellationToken, int initialRecie /// Signals that the socket has opened a connection. public event EventHandler Opened; + /// Gets whether the underlying socket has an open connection. + public bool Connected + { + get + { + return this.Socket.State == WebSocketState.Open; + } + } + + /// Gets the underlying object. + public ClientWebSocket Socket { get; } + /// Asynchronously connects to WebSocket server and start receiving income messages in separate Task. /// The of the WebSocket server to connect to. - public async Task ConnectAsync(Uri url) + /// The token used to close the socket connection. + public async Task ConnectAsync(Uri url, CancellationToken cancellationToken) { - await socket.ConnectAsync(url, this.tokenSource.Token); - recieveTask.Start(); + // Create a new token source, since we can't reuse an existing and cancelled one. + var internalTokenSource = new CancellationTokenSource(); + this.tokenSource = cancellationToken != CancellationToken.None + ? CancellationTokenSource.CreateLinkedTokenSource(internalTokenSource.Token, cancellationToken) + : internalTokenSource; + // Register the disconnect method as a fire and forget method to run when the user requests cancellation + // Don't pass in the cancellation token from the token source, since the token is cancelling the request. + this.tokenSource.Token.Register(() => Task.Run(this.DisconnectAsync)); + + // Open the connection and raise the opened event. + await Socket.ConnectAsync(url, this.tokenSource.Token); this.Opened?.Invoke(this, EventArgs.Empty); } /// Disconnects the WebSocket gracefully from the server. public async Task DisconnectAsync() { - await this.socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed the connection", CancellationToken.None); + await this.Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed the connection", CancellationToken.None); // Check if cancellation is already requested, - // i.e. the user requested cancellation from outside the class. + // e.g. the user requested cancellation from outside the class. if (!this.tokenSource.IsCancellationRequested) { this.tokenSource.Cancel(); @@ -95,7 +105,7 @@ public void AddHeaders(params KeyValuePair[] headers) { foreach (var header in headers) { - this.socket.Options.SetRequestHeader(header.Key, header.Value); + this.Socket.Options.SetRequestHeader(header.Key, header.Value); } } @@ -104,7 +114,7 @@ public void AddHeaders(params KeyValuePair[] headers) public Task SendAsync(string str) { var bytes = Encoding.UTF8.GetBytes(str); - return socket.SendAsync(new ArraySegment(bytes), WebSocketMessageType.Text, true, tokenSource.Token); + return Socket.SendAsync(new ArraySegment(bytes), WebSocketMessageType.Text, true, tokenSource.Token); } private async Task RecieveLoopAsync(CancellationToken cancellationToken) @@ -113,13 +123,14 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken) try { - while (true) + // Only process any messages when the socket has an open connection. + while (this.Connected) { var writeSegment = new ArraySegment(buffer); WebSocketReceiveResult result; do { - result = await socket.ReceiveAsync(writeSegment, cancellationToken); + result = await Socket.ReceiveAsync(writeSegment, CancellationToken.None); writeSegment = new ArraySegment(buffer, writeSegment.Offset + result.Count, writeSegment.Count - result.Count); // check buffer overflow @@ -139,7 +150,6 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken) } while (!result.EndOfMessage); var responce = Encoding.UTF8.GetString(buffer, 0, writeSegment.Offset); - this.MessageReceived?.Invoke(this, new MessageReceivedEventArgs { Message = responce }); } } @@ -156,7 +166,7 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken) /// public void Dispose() { - socket.Dispose(); + this.Socket.Dispose(); recieveTask.Dispose(); } }