Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 44 additions & 34 deletions WebSocketTextClient/WebSocketTextClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,33 @@
/// <seealso cref="System.Net.WebSockets.ClientWebSocket"/>
public sealed class WebSocketTextClient : IDisposable
{
private readonly CancellationTokenSource tokenSource;
private readonly ClientWebSocket socket;
private readonly Task recieveTask;
private readonly int initialRecieveBufferSize;

private readonly bool autoIncreaseRecieveBuffer;

/// <summary>Initializes a new instance of the <see cref="WebSocketTextClient"/> class.</summary>
/// <param name="initialRecieveBufferSize">The initial buffer size for incoming messages in bytes.</param>
/// <param name="autoIncreaseRecieveBuffer">True to double the buffer on overflow. Otherwise an <see cref="InvalidOperationException"/> will be thrown.</param>
public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true)
: this(CancellationToken.None, initialRecieveBufferSize, autoIncreaseRecieveBuffer)
{
}
private readonly Task recieveTask;

private CancellationTokenSource tokenSource;

/// <summary>Initializes a new instance of the <see cref="WebSocketTextClient"/> class.</summary>
/// <param name="cancellationToken">Cancels pending send and receive operations</param>
/// <param name="initialRecieveBufferSize">The initial buffer size for incoming messages in bytes.</param>
/// <param name="autoIncreaseRecieveBuffer">True to double the buffer on overflow. Otherwise an <see cref="InvalidOperationException"/> will be thrown.</param>
public WebSocketTextClient(CancellationToken cancellationToken, int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true)
public WebSocketTextClient(int initialRecieveBufferSize = 1024, bool autoIncreaseRecieveBuffer = true)
{
if (initialRecieveBufferSize <= 0)
{
throw new ArgumentException("Receive buffer size should be greater than zero", nameof(initialRecieveBufferSize));
}

var internalTokenSource = new CancellationTokenSource();
this.tokenSource = cancellationToken != CancellationToken.None
? CancellationTokenSource.CreateLinkedTokenSource(internalTokenSource.Token, cancellationToken)
: internalTokenSource;

// Register the disconnect method as a fire and forget method to run when the user requests cancellation
this.tokenSource.Token.Register(() => Task.Run(this.DisconnectAsync));


this.initialRecieveBufferSize = initialRecieveBufferSize;
this.autoIncreaseRecieveBuffer = autoIncreaseRecieveBuffer;

socket = new ClientWebSocket();
recieveTask = this.RecieveLoopAsync(this.tokenSource.Token);
this.Socket = new ClientWebSocket();
this.tokenSource = new CancellationTokenSource();

// Store the receive task with the a cancellation token.
// This is a fire and forget task that runs completely in the background
this.recieveTask = this.RecieveLoopAsync(this.tokenSource.Token);
}

/// <summary>Signals that response message fully received and ready to process.</summary>
Expand All @@ -64,23 +52,45 @@ public WebSocketTextClient(CancellationToken cancellationToken, int initialRecie
/// <summary>Signals that the socket has opened a connection.</summary>
public event EventHandler Opened;

/// <summary>Gets whether the underlying socket has an open connection.</summary>
public bool Connected
{
get
{
return this.Socket.State == WebSocketState.Open;
}
}

/// <summary>Gets the underlying <see cref="ClientWebSocket" /> object.</summary>
public ClientWebSocket Socket { get; }

/// <summary>Asynchronously connects to WebSocket server and start receiving income messages in separate Task.</summary>
/// <param name="url">The <see cref="Uri"/> of the WebSocket server to connect to.</param>
public async Task ConnectAsync(Uri url)
/// <param name="cancellationToken">The token used to close the socket connection.</param>
public async Task ConnectAsync(Uri url, CancellationToken cancellationToken)
{
await socket.ConnectAsync(url, this.tokenSource.Token);
recieveTask.Start();
// Create a new token source, since we can't reuse an existing and cancelled one.
var internalTokenSource = new CancellationTokenSource();
this.tokenSource = cancellationToken != CancellationToken.None
? CancellationTokenSource.CreateLinkedTokenSource(internalTokenSource.Token, cancellationToken)
: internalTokenSource;

// Register the disconnect method as a fire and forget method to run when the user requests cancellation
// Don't pass in the cancellation token from the token source, since the token is cancelling the request.
this.tokenSource.Token.Register(() => Task.Run(this.DisconnectAsync));

// Open the connection and raise the opened event.
await Socket.ConnectAsync(url, this.tokenSource.Token);
this.Opened?.Invoke(this, EventArgs.Empty);
}

/// <summary>Disconnects the WebSocket gracefully from the server.</summary>
public async Task DisconnectAsync()
{
await this.socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed the connection", CancellationToken.None);
await this.Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed the connection", CancellationToken.None);

// Check if cancellation is already requested,
// i.e. the user requested cancellation from outside the class.
// e.g. the user requested cancellation from outside the class.
if (!this.tokenSource.IsCancellationRequested)
{
this.tokenSource.Cancel();
Expand All @@ -95,7 +105,7 @@ public void AddHeaders(params KeyValuePair<string, string>[] headers)
{
foreach (var header in headers)
{
this.socket.Options.SetRequestHeader(header.Key, header.Value);
this.Socket.Options.SetRequestHeader(header.Key, header.Value);
}
}

Expand All @@ -104,7 +114,7 @@ public void AddHeaders(params KeyValuePair<string, string>[] headers)
public Task SendAsync(string str)
{
var bytes = Encoding.UTF8.GetBytes(str);
return socket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, tokenSource.Token);
return Socket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, tokenSource.Token);
}

private async Task RecieveLoopAsync(CancellationToken cancellationToken)
Expand All @@ -113,13 +123,14 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken)

try
{
while (true)
// Only process any messages when the socket has an open connection.
while (this.Connected)
{
var writeSegment = new ArraySegment<byte>(buffer);
WebSocketReceiveResult result;
do
{
result = await socket.ReceiveAsync(writeSegment, cancellationToken);
result = await Socket.ReceiveAsync(writeSegment, CancellationToken.None);
writeSegment = new ArraySegment<byte>(buffer, writeSegment.Offset + result.Count, writeSegment.Count - result.Count);

// check buffer overflow
Expand All @@ -139,7 +150,6 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken)
} while (!result.EndOfMessage);

var responce = Encoding.UTF8.GetString(buffer, 0, writeSegment.Offset);

this.MessageReceived?.Invoke(this, new MessageReceivedEventArgs { Message = responce });
}
}
Expand All @@ -156,7 +166,7 @@ private async Task RecieveLoopAsync(CancellationToken cancellationToken)
/// </remarks>
public void Dispose()
{
socket.Dispose();
this.Socket.Dispose();
recieveTask.Dispose();
}
}
Expand Down