From 99700522f21f6a06d96008646ddaadc398e74c97 Mon Sep 17 00:00:00 2001 From: Charles Eckman Date: Sun, 30 Mar 2025 20:37:51 -0400 Subject: [PATCH] Attempted split / rewrite of ntcp --- not_tcp/not_tcp.py | 370 +++++++++++++++++++++++----------------- not_tcp/not_tcp_test.py | 8 +- 2 files changed, 221 insertions(+), 157 deletions(-) diff --git a/not_tcp/not_tcp.py b/not_tcp/not_tcp.py index 84ef782..15c032e 100644 --- a/not_tcp/not_tcp.py +++ b/not_tcp/not_tcp.py @@ -6,7 +6,7 @@ """ -from amaranth import Module, Signal, unsigned, Const +from amaranth import Module, Signal, unsigned, Const, Assert from amaranth.lib.wiring import Component, In, Out, Signature, connect from amaranth.lib import stream from amaranth.lib.data import UnionLayout, Struct @@ -89,137 +89,69 @@ def elaborate(self, platform): return m -class StreamStop(Component): +class InboundStop(Component): """ - A stop for a Not TCP stream on the local bus. - - NOTE: the Not TCP bus currently uses broadcast sends; it is not a ring bus. + Inbound half of an nTCP stop. Parameters - ---------- - stream_id: stream ID to match / generate packets with + --------- + stream_id - Attributes - ---------- - session: Inner interface - bus: Bus interface - connected: Debug/test interface indicating connection status. - Testonly, for now. + Attributes: + ---------- + stop: inbound session interface + accepted: "active" from the outbound direction + connected: indicator that the session has connected in at least one direction + upstream: inbound bus interface """ - stop: Out(session.BidiSessionSignature()) - bus: In(BusStopSignature()) + stop: Out(session.SessionSignature()) + accepted: In(1) connected: Out(1) + bus: In(stream.Signature(8)) def __init__(self, stream_id): super().__init__() - self._stream_id = stream_id + self._stream_id = Const(stream_id) def elaborate(self, platform): m = Module() - - # Each of these is big enough to buffer one full packet. + connected = self.connected input_buffer = m.submodules.input_buffer = SyncFIFOBuffered( width=8, depth=256) - output_buffer = m.submodules.output_buffer = SyncFIFOBuffered( - width=8, depth=256) + m.d.comb += [ + self.stop.data.payload.eq(input_buffer.r_stream.payload), + self.stop.data.valid.eq(input_buffer.r_stream.valid), + input_buffer.r_stream.ready.eq(self.stop.data.ready), + ] - connect(m, self.stop.outbound.data, output_buffer.w_stream) - connect(m, input_buffer.r_stream, self.stop.inbound.data) input_limiter = m.submodules.input_limiter = LimitForwarder( width=8, max_count=256) - output_limiter = m.submodules.output_limiter = LimitForwarder( - width=8, max_count=256) # Default state: don't transfer any data. m.d.comb += [ - self.stop.inbound.active.eq(0), input_limiter.start.eq(0), - output_limiter.start.eq(0), ] connect(m, input_limiter.outbound, input_buffer.w_stream) - connect(m, output_buffer.r_stream, output_limiter.inbound) flags_layout = UnionLayout({"bytes": unsigned(8), "flags": Flags}) - # Header from inbound packet: read_len = Signal(8) read_flags = Signal(flags_layout) - stream = Signal(8) + read_stream = Signal(8) - # Flags for outbound packet: - send_flags = Signal(flags_layout) - m.d.comb += send_flags.flags.to_host.eq(1) - sent_start = Signal(1) - sent_end = Signal(1) - - # Connection-state handling: - m.d.comb += [self.stop.inbound.active.eq(0), - self.connected.eq(0)] - with m.FSM(name="connection"): - with m.State("closed"): - m.next = "closed" - with m.If( - read_flags.flags.start & - (stream == Const(self._stream_id))): - m.d.sync += sent_start.eq(0) - m.d.sync += sent_end.eq(0) - m.next = "requested" - with m.State("requested"): - m.next = "requested" - m.d.comb += [self.stop.inbound.active.eq(1)] - with m.If(self.stop.outbound.active): - m.next = "open" - with m.State("open"): - m.next = "open" - m.d.comb += [ - self.stop.inbound.active.eq(1), - self.connected.eq(1), - ] - with m.If(sent_end): - m.next = "server-done" - with m.Elif( - read_flags.flags.end & - (stream == Const(self._stream_id))): - # Client has marked end-of-stream. - # Consume the input buffer. - m.next = "client-done" - with m.State("client-done"): - m.next = "client-done" - m.d.comb += [self.connected.eq(1)] - with m.If(sent_end): - # Server is also done, and flushed. - m.next = "flush" - with m.State("server-done"): - m.next = "server-done" - m.d.comb += [self.connected.eq(1), - self.stop.inbound.active.eq(1)] - with m.If( - ~read_flags.flags.end & - (stream == Const(self._stream_id))): - m.next = "flush" - with m.State("flush"): - m.next = "flush" - m.d.comb += [self.connected.eq(1)] - with m.If( - (read_len == Const(0)) & - (input_buffer.r_level == Const(0)) & - (output_buffer.r_level == Const(0)) - ): - # All data processing done. - m.next = "closed" - # END of connection-state FSM + this_stop = read_stream == self._stream_id with m.FSM(name="read"): - bus = self.bus.upstream + bus = self.bus with m.State("read-stream"): m.next = "read-stream" m.d.comb += bus.ready.eq(1) + m.d.sync += read_flags.eq(0) + m.d.sync += read_len.eq(0) with m.If(bus.valid): - m.d.sync += stream.eq(bus.payload) - # Zero the flags, so we don't get a false start/end - m.d.sync += read_flags.eq(0) + m.d.sync += read_stream.eq(bus.payload) m.next = "read-len" with m.State("read-len"): m.next = "read-len" @@ -231,32 +163,49 @@ def elaborate(self, platform): m.next = "read-flags" m.d.comb += bus.ready.eq(1) with m.If(bus.valid): - # At the cycle edge, capture the flags byte... m.d.sync += read_flags.bytes.eq(bus.payload) - with m.If(stream == Const(self._stream_id)): - # maybe block until accepted. - # TODO: This introduces an extra cycle of delay - # when we're "already connected", - # but keeps the logic blocks simpler. - m.next = "await-accept" + is_start = flags_layout(bus.payload).flags.start + + # If the packet is for this channel + # and we're already connected or it's a start panic, + # handle it in the local path + with m.If(this_stop & (connected | is_start)): + with m.If(is_start): + m.d.sync += self.stop.active.eq(1) + m.next = "await-accept" + with m.Else(): + m.d.comb += input_limiter.count.eq(read_len) + m.d.comb += input_limiter.start.eq(1) + m.next = "read-body" + # Otherwise, ignore the packet. with m.Else(): - # If this isn't for our stream, proceed to read - # (and discard) + # TODO: Forward data to downstream stop if stream + # doesn't match. + # For now: if this isn't for our stream, + # proceed to read (and discard) m.d.comb += input_limiter.count.eq(read_len) m.d.comb += input_limiter.start.eq(1) m.next = "read-body" with m.State("await-accept"): m.next = "await-accept" - with m.If(self.connected): - # trigger the input-limiter to begin starting with - # the byte following that. + # Flags handling. We only do this if we've matched the stop ID. + m.d.comb += Assert(this_stop) + m.d.comb += Assert(read_flags.flags.start) + + with m.If(self.accepted): + m.d.sync += connected.eq(1) m.d.comb += input_limiter.count.eq(read_len) m.d.comb += input_limiter.start.eq(1) m.next = "read-body" + with m.State("read-body"): m.next = "read-body" - connect(m, self.bus.upstream, input_limiter.inbound) - with m.If(stream != Const(self._stream_id)): + m.d.comb += [ + input_limiter.inbound.payload.eq(self.bus.payload), + input_limiter.inbound.valid.eq(self.bus.valid), + self.bus.ready.eq(input_limiter.inbound.ready), + ] + with m.If(read_stream != self._stream_id): # Disconnect from the input buffer, just discard the data: m.d.comb += [ input_buffer.w_stream.valid.eq(0), @@ -264,72 +213,187 @@ def elaborate(self, platform): ] m.d.comb += input_limiter.start.eq(0) with m.If(input_limiter.done): - # Clear the input from the last packet, so we don't - # think that we have one pending until we read it again. - m.d.sync += [read_len.eq(0), stream.eq(self._stream_id+1)] m.next = "read-stream" + m.d.sync += [read_len.eq(0), read_stream.eq(0)] + with m.If(read_flags.flags.end): + m.d.sync += [ + self.stop.active.eq(0), + connected.eq(0) + ] + + return m + + +class OutboundStop(Component): + stop: In(session.SessionSignature()) + bus: Out(stream.Signature(8)) + connected: Out(1) + + def __init__(self, stream_id): + super().__init__() + self._stream_id = Const(stream_id) + + def elaborate(self, platform): + m = Module() + + # Each of these is big enough to buffer one full packet. + output_buffer = m.submodules.output_buffer = SyncFIFOBuffered( + width=8, depth=256) + + m.d.comb += [ + output_buffer.w_stream.payload.eq(self.stop.data.payload), + output_buffer.w_stream.valid.eq(self.stop.data.valid), + self.stop.data.ready.eq(output_buffer.w_stream.ready), + ] + + output_limiter = m.submodules.output_limiter = LimitForwarder( + width=8, max_count=256) + + # Default state: don't transfer any data. + m.d.comb += [ + output_limiter.start.eq(0), + self.bus.valid.eq(0) + ] + connect(m, output_buffer.r_stream, output_limiter.inbound) + + flags_layout = UnionLayout({"bytes": unsigned(8), "flags": Flags}) + + # Flags for outbound packet: + send_flags = Signal(flags_layout) + m.d.sync += send_flags.flags.to_host.eq(1) + send_len = Signal(8) # Cases in which we want to send a packet: - data_to_send = output_buffer.r_level > 0 - not_yet_started = self.connected & ~sent_start - not_yet_ended = (self.connected & - ~self.stop.outbound.active & ~sent_end) with m.FSM(name="write"): - bus = self.bus.downstream - write_len = Signal(8) + with m.State("disconnected"): + m.next = "disconnected" + with m.If(self.stop.active): + # Immediately send a "start" packet. + m.d.sync += send_flags.flags.start.eq(1) + m.d.sync += send_flags.flags.end.eq(0) + m.d.sync += self.connected.eq(1) + m.next = "write-stream" with m.State("write-stream"): m.next = "write-stream" - with m.If(data_to_send | not_yet_started | not_yet_ended): - # We're ready to start sending... - m.d.comb += bus.payload.eq(self._stream_id) - m.d.comb += bus.valid.eq(1) + with m.If( + send_flags.flags.start + | ~self.stop.active + | output_buffer.level > 0): + # Start sending. + m.d.comb += self.bus.payload.eq(self._stream_id) + m.d.comb += self.bus.valid.eq(1) # Lock in the level as the length of this packet. # We may send a short (zero-length) packet # to start or end the connection. - m.d.sync += write_len.eq(output_buffer.r_level) - with m.If(bus.ready): - # Only transition if we are ready-to-send. + m.d.sync += send_len.eq(output_buffer.r_level) + # We send an explicit empty END packet. + m.d.sync += send_flags.flags.end.eq( + ~self.stop.active & + (output_buffer.r_level == Const(0))) + with m.If(self.bus.ready): m.next = "write-len" - with m.Else(): - m.d.comb += bus.valid.eq(0) with m.State("write-len"): m.next = "write-len" - # Write the length. - m.d.comb += bus.payload.eq(write_len) - m.d.comb += bus.valid.eq(1) - with m.If(bus.ready): + m.d.comb += self.bus.payload.eq(send_len) + m.d.comb += self.bus.valid.eq(1) + with m.If(self.bus.ready): m.next = "write-flags" with m.State("write-flags"): m.next = "write-flags" - # Send the "end" bit if: - # - we haven't yet sent one, - # - the server has closed the connection, and - # - we have sent all the data from the server - # This generates an extra empty "end" packet- - # noisy, but correct. - send_end = not_yet_ended & (output_buffer.r_level == 0) - m.d.comb += [ - send_flags.flags.start.eq(~sent_start), - send_flags.flags.end.eq(send_end), - bus.payload.eq(send_flags.bytes), - bus.valid.eq(1), + self.bus.payload.eq(send_flags.bytes), + self.bus.valid.eq(1), ] - m.d.comb += [ - output_limiter.count.eq(write_len), - output_limiter.start.eq(1), - ] - with m.If(bus.ready): - m.d.sync += [ - sent_start.eq(1), - sent_end.eq(send_end), + with m.If(self.bus.ready): + m.d.comb += [ + output_limiter.count.eq(send_len), + output_limiter.start.eq(1), ] m.next = "write-body" with m.State("write-body"): m.next = "write-body" - connect(m, output_limiter.outbound, bus) + m.d.comb += [ + self.bus.payload.eq(output_limiter.outbound.payload), + self.bus.valid.eq(output_limiter.outbound.valid), + output_limiter.outbound.ready.eq(self.bus.ready), + ] + with m.If(output_limiter.done): - m.next = "write-stream" + m.d.sync += [ + send_flags.flags.start.eq(0), + send_flags.flags.end.eq(0), + ] + with m.If(send_flags.flags.end): + m.d.sync += self.connected.eq(0) + m.next = "disconnected" + with m.Else(): + m.next = "write-stream" + + return m + + +class StreamStop(Component): + """ + A stop for a Not TCP stream on the local bus. + + NOTE: the Not TCP bus currently uses broadcast sends; it is not a ring bus. + + Parameters + ---------- + stream_id: stream ID to match / generate packets with + + Attributes + ---------- + stop: Inner interface + bus: Bus interface + + connected: Debug/test interface indicating connection status. + Testonly, for now. + """ + + stop: Out(session.BidiSessionSignature()) + bus: In(BusStopSignature()) + connected: Out(1) + + def __init__(self, stream_id): + super().__init__() + self._stream_id = stream_id + + def elaborate(self, platform): + m = Module() + + inbound_inner = m.submodules.inbound = InboundStop(self._stream_id) + inbound_outer = self.stop.inbound + outbound_inner = m.submodules.outbound = OutboundStop(self._stream_id) + outbound_outer = self.stop.outbound + + m.d.comb += [ + self.connected.eq( + inbound_inner.connected | outbound_inner.connected + ), + + inbound_outer.active.eq(inbound_inner.stop.active), + inbound_outer.data.payload.eq(inbound_inner.stop.data.payload), + inbound_outer.data.valid.eq(inbound_inner.stop.data.valid), + inbound_inner.stop.data.ready.eq(inbound_outer.data.ready), + + outbound_inner.stop.active.eq(outbound_outer.active), + outbound_inner.stop.data.payload.eq( + outbound_outer.data.payload), + outbound_inner.stop.data.valid.eq(outbound_outer.data.valid), + outbound_outer.data.ready.eq(outbound_inner.stop.data.ready), + + inbound_inner.accepted.eq(outbound_inner.stop.active), + + inbound_inner.bus.payload.eq(self.bus.upstream.payload), + inbound_inner.bus.valid.eq(self.bus.upstream.valid), + self.bus.upstream.ready.eq(inbound_inner.bus.ready), + + self.bus.downstream.payload.eq(outbound_inner.bus.payload), + self.bus.downstream.valid.eq(outbound_inner.bus.valid), + outbound_inner.bus.ready.eq(self.bus.downstream.ready), + ] + return m diff --git a/not_tcp/not_tcp_test.py b/not_tcp/not_tcp_test.py index 6ea2c57..5bfa88c 100644 --- a/not_tcp/not_tcp_test.py +++ b/not_tcp/not_tcp_test.py @@ -64,8 +64,7 @@ async def driver(ctx): sim.add_testbench(driver) sim.add_clock(1e-6) - with sim.write_vcd("testout.vcd"): - sim.run() + sim.run() # After simulation is complete... # The stop should have received all the packets for this stream: @@ -77,9 +76,10 @@ async def driver(ctx): rcvd = collect_bus.body packets = [] while len(rcvd) > 0: + # All data should be packetized. (p, remainder) = Packet.from_bytes(rcvd) - if p is not None: - packets += [p] + assert p is not None, f"remaining data: {rcvd}" + packets += [p] rcvd = remainder bodies = bytes() for i in range(len(packets)):