Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ThingSet.Client/Schema/ThingSetNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
using System;
using System.Collections.ObjectModel;
using ThingSet.Common;
using ThingSet.Common.Protocols;

namespace ThingSet.Client.Schema;
Expand Down
1 change: 1 addition & 0 deletions src/ThingSet.Client/Schema/ThingSetSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using ThingSet.Common;
using ThingSet.Common.Protocols;

namespace ThingSet.Client.Schema;
Expand Down
4 changes: 2 additions & 2 deletions src/ThingSet.Client/ThingSetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ public IEnumerable<ThingSetNode> GetNodes(ThingSetNodeEnumerationOptions options
{
throw new IOException("Could not connect to ThingSet endpoint.");
}
_transport.Read(buffer);
int read = _transport.Read(buffer);
ThingSetResponse response = (ThingSetStatus)buffer[0];
if (response.Success)
{
CborReader reader = new CborReader(buffer.AsMemory().Slice(1), CborConformanceMode.Lax, allowMultipleRootLevelValues: true);
CborReader reader = new CborReader(buffer.AsMemory().Slice(1, read - 1), CborConformanceMode.Lax, allowMultipleRootLevelValues: true);
reader.ReadNull();
return CborDeserialiser.Read(reader);
}
Expand Down
20 changes: 16 additions & 4 deletions src/ThingSet.Common.Transports.Can/CanServerTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using SocketCANSharp;
Expand All @@ -27,7 +28,7 @@ public class CanServerTransport : CanTransportBase, IServerTransport

private bool _runPeerSocketHandlers = true;

private Func<byte, byte[], byte[]>? _messageCallback;
private Func<object, Memory<byte>, Memory<byte>>? _messageCallback;

public CanServerTransport(ThingSetCanInterface canInterface) : base(canInterface, leaveOpen: false)
{
Expand All @@ -40,7 +41,9 @@ public CanServerTransport(ThingSetCanInterface canInterface) : base(canInterface
_addressClaimListener.AddressClaimed += OnAddressClaimed;
}

public ValueTask ListenAsync(Func<byte, byte[], byte[]> callback)
public event EventHandler<ErrorEventArgs>? Error;

public ValueTask ListenAsync(Func<object, Memory<byte>, Memory<byte>> callback)
{
_addressClaimListener.Listen();

Expand Down Expand Up @@ -120,6 +123,7 @@ private int WriteFrame(uint canId, byte[] buffer)

private int WriteFdFrame(uint canId, byte[] buffer)
{

CanFdFrame frame = new CanFdFrame
{
CanId = canId,
Expand Down Expand Up @@ -154,8 +158,16 @@ private void RunPeerSocketHandler(object? state)
int read = socket.Read(buffer);
if (read > 0 && _messageCallback is not null)
{
byte[] response = _messageCallback(peerId, buffer);
socket.Write(response);
try
{
Memory<byte> memory = buffer;
Memory<byte> response = _messageCallback(peerId, memory.Slice(0, read));
socket.Write(response.ToArray());
}
catch (Exception ex)
{
Error?.Invoke(this, new ErrorEventArgs(ex));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
<ProjectUrl>https://github.com/Brill-Power/ThingSet.Net</ProjectUrl>
<RepositoryUrl>https://github.com/Brill-Power/ThingSet.Net.git</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<Description>CAN transport for .NET ThingSet client</Description>
<Description>CAN transport for .NET ThingSet client and server</Description>
<PackageId>ThingSet.Common.Transports.Can</PackageId>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageReadmeFile>README.md</PackageReadmeFile>
<PackageTags>iot</PackageTags>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="SocketCANSharp" Version="0.12.0" />
<PackageReference Include="SocketCANSharp" Version="0.13.0" />
</ItemGroup>

<ItemGroup>
Expand Down
21 changes: 9 additions & 12 deletions src/ThingSet.Common.Transports.Ip/IpClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
using System.Threading;
using System.Threading.Tasks;
using ThingSet.Common.Protocols.Binary;
using static ThingSet.Common.Transports.Ip.Protocol;

namespace ThingSet.Common.Transports.Ip;

/// <summary>
/// ThingSet client transport for IP (TCP/UDP).
/// </summary>
public class IpClientTransport : IClientTransport
{
private const int MessageSize = 512;
private const int MessageTypePosition = 4;

private readonly string _hostname;
private readonly int _port;

Expand All @@ -31,6 +32,10 @@ public class IpClientTransport : IClientTransport

private Action<ReadOnlyMemory<byte>>? _callback;

public IpClientTransport(string hostname) : this(hostname, Protocol.RequestResponsePort)
{
}

public IpClientTransport(string hostname, int port)
{
_hostname = hostname;
Expand Down Expand Up @@ -70,7 +75,7 @@ public ValueTask SubscribeAsync(Action<ReadOnlyMemory<byte>> callback)
{
_callback = callback;
_udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, 9002));
_udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, Protocol.PublishSubscribePort));
_subscriptionThread.Start();
return ValueTask.CompletedTask;
}
Expand Down Expand Up @@ -157,12 +162,4 @@ public void Append(byte[] buffer)
Position += buffer.Length - 2;
}
}

private enum MessageType
{
First = 0x0 << MessageTypePosition,
Consecutive = 0x1 << MessageTypePosition,
Last = 0x2 << MessageTypePosition,
Single = 0x3 << MessageTypePosition,
}
}
152 changes: 152 additions & 0 deletions src/ThingSet.Common.Transports.Ip/IpServerTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (c) 2025 Brill Power.
*
* SPDX-License-Identifier: Apache-2.0
*/
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using ThingSet.Common.Protocols;
using static ThingSet.Common.Transports.Ip.Protocol;

namespace ThingSet.Common.Transports.Ip;

/// <summary>
/// ThingSet server transport for IP (TCP/UDP).
/// </summary>
public class IpServerTransport : IServerTransport
{
private const int MessageSize = 512;
private const int HeaderSize = 2;

private static readonly IPEndPoint Broadcast = new IPEndPoint(IPAddress.Broadcast, Protocol.PublishSubscribePort);

private readonly TcpListener _listener;
private readonly UdpClient _udpClient;

private readonly CancellationTokenSource _listenerCanceller = new CancellationTokenSource();
private readonly Thread _listenThread;

private byte _messageNumber;

private Func<object, Memory<byte>, Memory<byte>>? _callback;

public IpServerTransport() : this(IPAddress.Any)
{
}

public event EventHandler<ErrorEventArgs>? Error;

public IpServerTransport(IPAddress listenAddress)
{
_listener = new TcpListener(listenAddress, Protocol.RequestResponsePort);
_listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);

_udpClient = new UdpClient(Protocol.PublishSubscribePort, AddressFamily.InterNetwork);
_udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);

_listenThread = new Thread(RunListener)
{
IsBackground = true,
};
}

public void Dispose()
{
_listenerCanceller.Cancel();
_listener.Stop();
_listener.Dispose();
_udpClient.Dispose();
}

public ValueTask ListenAsync(Func<object, Memory<byte>, Memory<byte>> callback)
{
_callback = callback;
_listenThread.Start();
return ValueTask.CompletedTask;
}

public void PublishControl(ushort id, byte[] buffer)
{
throw new NotImplementedException();
}

public void PublishReport(byte[] buffer)
{
int written = 0;
MessageType messageType;
Span<byte> source = buffer;
Span<byte> frame = stackalloc byte[MessageSize + HeaderSize];
byte sequenceNumber = 0;
while (written < buffer.Length)
{
int size = Math.Min(MessageSize, buffer.Length - written);
messageType = written == 0 ?
(buffer.Length < MessageSize ?
MessageType.Single : MessageType.First) :
(buffer.Length - written <= MessageSize) ?
MessageType.Last : MessageType.Consecutive;
frame[0] = (byte)((byte)messageType | (sequenceNumber++ & 0x0F));
unchecked
{
frame[1] = _messageNumber++;
}

Span<byte> slice = source.Slice(written, size);
slice.CopyTo(frame.Slice(HeaderSize));
_udpClient.Send(frame.Slice(0, HeaderSize + size), Broadcast);
written += size;
}
}

private async void RunListener()
{
_listener.Start();

while (!_listenerCanceller.IsCancellationRequested)
{
try
{
TcpClient client = await _listener.AcceptTcpClientAsync(_listenerCanceller.Token);
Task.Run(() => HandleRequest(client, _listenerCanceller.Token)).GetAwaiter();
}
catch (OperationCanceledException)
{
}
}
}

private async Task HandleRequest(TcpClient client, CancellationToken cancellationToken)
{
Memory<byte> buffer = new byte[8192];
using (client)
{
try
{
await using NetworkStream stream = client.GetStream();
int read;
while ((read = await stream.ReadAsync(buffer, cancellationToken)) != 0)
{
Memory<byte> response;
try
{
response = _callback!(client.Client.RemoteEndPoint!, buffer.Slice(0, read));
}
catch (Exception ex)
{
Error?.Invoke(this, new ErrorEventArgs(ex));
response = new byte[] { (byte)ThingSetStatus.InternalServerError };
}
await stream.WriteAsync(response, cancellationToken);
}
}
catch (Exception ex)
{
Error?.Invoke(this, new ErrorEventArgs(ex));
}
}
}
}
22 changes: 22 additions & 0 deletions src/ThingSet.Common.Transports.Ip/Protocol.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2025 Brill Power.
*
* SPDX-License-Identifier: Apache-2.0
*/
namespace ThingSet.Common.Transports.Ip;

public class Protocol
{
public const int RequestResponsePort = 9001;
public const int PublishSubscribePort = 9002;

private const int MessageTypePosition = 4;

internal enum MessageType
{
First = 0x0 << MessageTypePosition,
Consecutive = 0x1 << MessageTypePosition,
Last = 0x2 << MessageTypePosition,
Single = 0x3 << MessageTypePosition,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<ProjectUrl>https://github.com/Brill-Power/ThingSet.Net</ProjectUrl>
<RepositoryUrl>https://github.com/Brill-Power/ThingSet.Net.git</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<Description>IP transport for .NET ThingSet client</Description>
<Description>IP transport for .NET ThingSet client and server</Description>
<PackageId>ThingSet.Common.Transports.Ip</PackageId>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand Down
2 changes: 1 addition & 1 deletion src/ThingSet.Common/Protocols/Binary/CborDeserialiser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static class CborDeserialiser
case CborReaderState.UnsignedInteger:
return ReadInteger(reader, state);
case CborReaderState.HalfPrecisionFloat:
return reader.ReadHalf();
return (float)reader.ReadHalf();
case CborReaderState.SinglePrecisionFloat:
return reader.ReadSingle();
case CborReaderState.DoublePrecisionFloat:
Expand Down
3 changes: 3 additions & 0 deletions src/ThingSet.Common/Protocols/Binary/CborSerialiser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public static void Write(CborWriter writer, object? value)
case Array a:
WriteArray(writer, a);
break;
case Enum e:
Write(writer, Convert.ChangeType(value, value.GetType().GetEnumUnderlyingType()));
break;
default:
if (value is null)
{
Expand Down
3 changes: 1 addition & 2 deletions src/ThingSet.Common/ThingSet.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JsonPointer.Net" Version="5.3.0" />
<PackageReference Include="System.Formats.Cbor" Version="8.0.0" />
<PackageReference Include="System.Formats.Cbor" Version="9.0.9" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading