From f86e46d8069109d9ee44318138bdf261e4876c74 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 17:38:45 +0200 Subject: [PATCH 01/17] make Failure an exception --- CHANGELOG.md | 1 + src/Failure.php | 134 ++++++++++++++++-------------- src/Failure/ClosedByServer.php | 5 +- src/Failure/ClosedBySignal.php | 5 +- src/Failure/ToAck.php | 6 +- src/Failure/ToAdjustQos.php | 6 +- src/Failure/ToBind.php | 5 +- src/Failure/ToCancel.php | 6 +- src/Failure/ToCloseChannel.php | 6 +- src/Failure/ToCloseConnection.php | 6 +- src/Failure/ToCommit.php | 6 +- src/Failure/ToConsume.php | 5 +- src/Failure/ToDeclareExchange.php | 5 +- src/Failure/ToDeclareQueue.php | 5 +- src/Failure/ToDeleteExchange.php | 5 +- src/Failure/ToDeleteQueue.php | 5 +- src/Failure/ToGet.php | 5 +- src/Failure/ToOpenChannel.php | 6 +- src/Failure/ToOpenConnection.php | 6 +- src/Failure/ToPublish.php | 5 +- src/Failure/ToPurge.php | 5 +- src/Failure/ToReadFrame.php | 6 +- src/Failure/ToReadMessage.php | 6 +- src/Failure/ToRecover.php | 6 +- src/Failure/ToReject.php | 6 +- src/Failure/ToRollback.php | 6 +- src/Failure/ToSelect.php | 6 +- src/Failure/ToSendFrame.php | 6 +- src/Failure/ToUnbind.php | 5 +- src/Failure/UnexpectedFrame.php | 6 +- 30 files changed, 130 insertions(+), 161 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45b21fd..579c82f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Requires `innmind/foundation:~1.5` - `Innmind\AMQP\Factory::make()` timeout argument is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period` +- `Innmind\AMQP\Failure` is now an exception that wraps each possible failure object ### Fixed diff --git a/src/Failure.php b/src/Failure.php index d11c2f5..0950252 100644 --- a/src/Failure.php +++ b/src/Failure.php @@ -7,165 +7,167 @@ use Innmind\Signals\Signal; use Innmind\Immutable\Maybe; -/** - * @psalm-immutable - */ -abstract class Failure +final class Failure extends Exception\RuntimeException { + private function __construct( + private object $failure, + ) { + } + #[\NoDiscard] - final public static function toOpenConnection(): self + public static function toOpenConnection(): self { - return new Failure\ToOpenConnection; + return new self(new Failure\ToOpenConnection); } #[\NoDiscard] - final public static function toOpenChannel(): self + public static function toOpenChannel(): self { - return new Failure\ToOpenChannel; + return new self(new Failure\ToOpenChannel); } #[\NoDiscard] - final public static function toCloseChannel(): self + public static function toCloseChannel(): self { - return new Failure\ToCloseChannel; + return new self(new Failure\ToCloseChannel); } #[\NoDiscard] - final public static function toCloseConnection(): self + public static function toCloseConnection(): self { - return new Failure\ToCloseConnection; + return new self(new Failure\ToCloseConnection); } #[\NoDiscard] - final public static function toDeleteQueue(Model\Queue\Deletion $command): self + public static function toDeleteQueue(Model\Queue\Deletion $command): self { - return new Failure\ToDeleteQueue($command); + return new self(new Failure\ToDeleteQueue($command)); } #[\NoDiscard] - final public static function toDeleteExchange(Model\Exchange\Deletion $command): self + public static function toDeleteExchange(Model\Exchange\Deletion $command): self { - return new Failure\ToDeleteExchange($command); + return new self(new Failure\ToDeleteExchange($command)); } #[\NoDiscard] - final public static function toDeclareQueue(Model\Queue\Declaration $command): self + public static function toDeclareQueue(Model\Queue\Declaration $command): self { - return new Failure\ToDeclareQueue($command); + return new self(new Failure\ToDeclareQueue($command)); } #[\NoDiscard] - final public static function toDeclareExchange(Model\Exchange\Declaration $command): self + public static function toDeclareExchange(Model\Exchange\Declaration $command): self { - return new Failure\ToDeclareExchange($command); + return new self(new Failure\ToDeclareExchange($command)); } #[\NoDiscard] - final public static function toBind(Model\Queue\Binding $command): self + public static function toBind(Model\Queue\Binding $command): self { - return new Failure\ToBind($command); + return new self(new Failure\ToBind($command)); } #[\NoDiscard] - final public static function toUnbind(Model\Queue\Unbinding $command): self + public static function toUnbind(Model\Queue\Unbinding $command): self { - return new Failure\ToUnbind($command); + return new self(new Failure\ToUnbind($command)); } #[\NoDiscard] - final public static function toPurge(Model\Queue\Purge $command): self + public static function toPurge(Model\Queue\Purge $command): self { - return new Failure\ToPurge($command); + return new self(new Failure\ToPurge($command)); } #[\NoDiscard] - final public static function toAdjustQos(): self + public static function toAdjustQos(): self { - return new Failure\ToAdjustQos; + return new self(new Failure\ToAdjustQos); } #[\NoDiscard] - final public static function toPublish(Model\Basic\Publish $command): self + public static function toPublish(Model\Basic\Publish $command): self { - return new Failure\ToPublish($command); + return new self(new Failure\ToPublish($command)); } #[\NoDiscard] - final public static function toGet(Model\Basic\Get $command): self + public static function toGet(Model\Basic\Get $command): self { - return new Failure\ToGet($command); + return new self(new Failure\ToGet($command)); } #[\NoDiscard] - final public static function toConsume(Model\Basic\Consume $command): self + public static function toConsume(Model\Basic\Consume $command): self { - return new Failure\ToConsume($command); + return new self(new Failure\ToConsume($command)); } #[\NoDiscard] - final public static function toAck(string $queue): self + public static function toAck(string $queue): self { - return new Failure\ToAck($queue); + return new self(new Failure\ToAck($queue)); } #[\NoDiscard] - final public static function toReject(string $queue): self + public static function toReject(string $queue): self { - return new Failure\ToReject($queue); + return new self(new Failure\ToReject($queue)); } #[\NoDiscard] - final public static function toCancel(string $queue): self + public static function toCancel(string $queue): self { - return new Failure\ToCancel($queue); + return new self(new Failure\ToCancel($queue)); } #[\NoDiscard] - final public static function toRecover(string $queue): self + public static function toRecover(string $queue): self { - return new Failure\ToRecover($queue); + return new self(new Failure\ToRecover($queue)); } #[\NoDiscard] - final public static function toSelect(): self + public static function toSelect(): self { - return new Failure\ToSelect; + return new self(new Failure\ToSelect); } #[\NoDiscard] - final public static function toCommit(): self + public static function toCommit(): self { - return new Failure\ToCommit; + return new self(new Failure\ToCommit); } #[\NoDiscard] - final public static function toRollback(): self + public static function toRollback(): self { - return new Failure\ToRollback; + return new self(new Failure\ToRollback); } #[\NoDiscard] - final public static function toSendFrame(): self + public static function toSendFrame(): self { - return new Failure\ToSendFrame; + return new self(new Failure\ToSendFrame); } #[\NoDiscard] - final public static function toReadFrame(): self + public static function toReadFrame(): self { - return new Failure\ToReadFrame; + return new self(new Failure\ToReadFrame); } #[\NoDiscard] - final public static function toReadMessage(): self + public static function toReadMessage(): self { - return new Failure\ToReadMessage; + return new self(new Failure\ToReadMessage); } #[\NoDiscard] - final public static function unexpectedFrame(): self + public static function unexpectedFrame(): self { - return new Failure\UnexpectedFrame; + return new self(new Failure\UnexpectedFrame); } /** @@ -173,20 +175,30 @@ final public static function unexpectedFrame(): self * @param Maybe $method */ #[\NoDiscard] - final public static function closedByServer( + public static function closedByServer( string $message, int $code, Maybe $method, ): self { - return new Failure\ClosedByServer($message, $code, $method); + return new self(new Failure\ClosedByServer($message, $code, $method)); + } + + #[\NoDiscard] + public static function closedBySignal(Signal $signal): self + { + return new self(new Failure\ClosedBySignal($signal)); } #[\NoDiscard] - final public static function closedBySignal(Signal $signal): self + public function unwrap(): object { - return new Failure\ClosedBySignal($signal); + return $this->failure; } #[\NoDiscard] - abstract public function kind(): Failure\Kind; + public function kind(): Failure\Kind + { + /** @psalm-suppress MixedMethodCall,MixedReturnStatement */ + return $this->failure->kind(); + } } diff --git a/src/Failure/ClosedByServer.php b/src/Failure/ClosedByServer.php index 975ce79..d873af2 100644 --- a/src/Failure/ClosedByServer.php +++ b/src/Failure/ClosedByServer.php @@ -4,7 +4,6 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Transport\Frame\Method, }; use Innmind\Immutable\Maybe; @@ -12,7 +11,7 @@ /** * @psalm-immutable */ -final class ClosedByServer extends Failure +final class ClosedByServer { private string $message; /** @var int<0, 65535> */ @@ -33,7 +32,7 @@ public function __construct(string $message, int $code, Maybe $method) $this->method = $method; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::closedByServer; diff --git a/src/Failure/ClosedBySignal.php b/src/Failure/ClosedBySignal.php index 7f31f86..cc4f933 100644 --- a/src/Failure/ClosedBySignal.php +++ b/src/Failure/ClosedBySignal.php @@ -3,13 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; use Innmind\Signals\Signal; /** * @psalm-immutable */ -final class ClosedBySignal extends Failure +final class ClosedBySignal { private Signal $signal; @@ -21,7 +20,7 @@ public function __construct(Signal $signal) $this->signal = $signal; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::closedBySignal; diff --git a/src/Failure/ToAck.php b/src/Failure/ToAck.php index 31a39f1..e822d67 100644 --- a/src/Failure/ToAck.php +++ b/src/Failure/ToAck.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToAck extends Failure +final class ToAck { private string $queue; @@ -20,7 +18,7 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toAck; diff --git a/src/Failure/ToAdjustQos.php b/src/Failure/ToAdjustQos.php index 9d42910..ed3bc9a 100644 --- a/src/Failure/ToAdjustQos.php +++ b/src/Failure/ToAdjustQos.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToAdjustQos extends Failure +final class ToAdjustQos { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toAdjustQos; diff --git a/src/Failure/ToBind.php b/src/Failure/ToBind.php index 0cf4c16..ecffa64 100644 --- a/src/Failure/ToBind.php +++ b/src/Failure/ToBind.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Queue\Binding as Command, }; /** * @psalm-immutable */ -final class ToBind extends Failure +final class ToBind { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toBind; diff --git a/src/Failure/ToCancel.php b/src/Failure/ToCancel.php index 1ba3fe4..f9c9ad9 100644 --- a/src/Failure/ToCancel.php +++ b/src/Failure/ToCancel.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCancel extends Failure +final class ToCancel { private string $queue; @@ -20,7 +18,7 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toCancel; diff --git a/src/Failure/ToCloseChannel.php b/src/Failure/ToCloseChannel.php index 854d538..8f7585f 100644 --- a/src/Failure/ToCloseChannel.php +++ b/src/Failure/ToCloseChannel.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCloseChannel extends Failure +final class ToCloseChannel { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toCloseChannel; diff --git a/src/Failure/ToCloseConnection.php b/src/Failure/ToCloseConnection.php index 3056ed0..441db5d 100644 --- a/src/Failure/ToCloseConnection.php +++ b/src/Failure/ToCloseConnection.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCloseConnection extends Failure +final class ToCloseConnection { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toCloseConnection; diff --git a/src/Failure/ToCommit.php b/src/Failure/ToCommit.php index 0ea5c7a..f4dd01b 100644 --- a/src/Failure/ToCommit.php +++ b/src/Failure/ToCommit.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCommit extends Failure +final class ToCommit { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toCommit; diff --git a/src/Failure/ToConsume.php b/src/Failure/ToConsume.php index 2757c38..92905f9 100644 --- a/src/Failure/ToConsume.php +++ b/src/Failure/ToConsume.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Basic\Consume as Command, }; /** * @psalm-immutable */ -final class ToConsume extends Failure +final class ToConsume { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toConsume; diff --git a/src/Failure/ToDeclareExchange.php b/src/Failure/ToDeclareExchange.php index 52c2cb5..eda19a5 100644 --- a/src/Failure/ToDeclareExchange.php +++ b/src/Failure/ToDeclareExchange.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Exchange\Declaration as Command, }; /** * @psalm-immutable */ -final class ToDeclareExchange extends Failure +final class ToDeclareExchange { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toDeclareExchange; diff --git a/src/Failure/ToDeclareQueue.php b/src/Failure/ToDeclareQueue.php index b7fe4cb..818b110 100644 --- a/src/Failure/ToDeclareQueue.php +++ b/src/Failure/ToDeclareQueue.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Queue\Declaration as Command, }; /** * @psalm-immutable */ -final class ToDeclareQueue extends Failure +final class ToDeclareQueue { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toDeclareQueue; diff --git a/src/Failure/ToDeleteExchange.php b/src/Failure/ToDeleteExchange.php index 8d4990b..dc262d9 100644 --- a/src/Failure/ToDeleteExchange.php +++ b/src/Failure/ToDeleteExchange.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Exchange\Deletion as Command, }; /** * @psalm-immutable */ -final class ToDeleteExchange extends Failure +final class ToDeleteExchange { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toDeleteExchange; diff --git a/src/Failure/ToDeleteQueue.php b/src/Failure/ToDeleteQueue.php index 35b618c..0ddec3d 100644 --- a/src/Failure/ToDeleteQueue.php +++ b/src/Failure/ToDeleteQueue.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Queue\Deletion as Command, }; /** * @psalm-immutable */ -final class ToDeleteQueue extends Failure +final class ToDeleteQueue { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toDeleteQueue; diff --git a/src/Failure/ToGet.php b/src/Failure/ToGet.php index 0d708f3..9e91201 100644 --- a/src/Failure/ToGet.php +++ b/src/Failure/ToGet.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Basic\Get as Command, }; /** * @psalm-immutable */ -final class ToGet extends Failure +final class ToGet { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toGet; diff --git a/src/Failure/ToOpenChannel.php b/src/Failure/ToOpenChannel.php index dfefb3b..91c6bec 100644 --- a/src/Failure/ToOpenChannel.php +++ b/src/Failure/ToOpenChannel.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToOpenChannel extends Failure +final class ToOpenChannel { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toOpenChannel; diff --git a/src/Failure/ToOpenConnection.php b/src/Failure/ToOpenConnection.php index 8a5423c..a7ad378 100644 --- a/src/Failure/ToOpenConnection.php +++ b/src/Failure/ToOpenConnection.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToOpenConnection extends Failure +final class ToOpenConnection { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toOpenConnection; diff --git a/src/Failure/ToPublish.php b/src/Failure/ToPublish.php index a428060..3d7c188 100644 --- a/src/Failure/ToPublish.php +++ b/src/Failure/ToPublish.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Basic\Publish as Command, }; /** * @psalm-immutable */ -final class ToPublish extends Failure +final class ToPublish { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toPublish; diff --git a/src/Failure/ToPurge.php b/src/Failure/ToPurge.php index 384c2e4..9a20526 100644 --- a/src/Failure/ToPurge.php +++ b/src/Failure/ToPurge.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Queue\Purge as Command, }; /** * @psalm-immutable */ -final class ToPurge extends Failure +final class ToPurge { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toPurge; diff --git a/src/Failure/ToReadFrame.php b/src/Failure/ToReadFrame.php index 2450975..aff9fdb 100644 --- a/src/Failure/ToReadFrame.php +++ b/src/Failure/ToReadFrame.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToReadFrame extends Failure +final class ToReadFrame { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toReadFrame; diff --git a/src/Failure/ToReadMessage.php b/src/Failure/ToReadMessage.php index 31561c0..b1b13df 100644 --- a/src/Failure/ToReadMessage.php +++ b/src/Failure/ToReadMessage.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToReadMessage extends Failure +final class ToReadMessage { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toReadMessage; diff --git a/src/Failure/ToRecover.php b/src/Failure/ToRecover.php index f433926..f23cf63 100644 --- a/src/Failure/ToRecover.php +++ b/src/Failure/ToRecover.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToRecover extends Failure +final class ToRecover { private string $queue; @@ -20,7 +18,7 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toRecover; diff --git a/src/Failure/ToReject.php b/src/Failure/ToReject.php index 43ca45d..5497564 100644 --- a/src/Failure/ToReject.php +++ b/src/Failure/ToReject.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToReject extends Failure +final class ToReject { private string $queue; @@ -20,7 +18,7 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toReject; diff --git a/src/Failure/ToRollback.php b/src/Failure/ToRollback.php index 493c069..072e068 100644 --- a/src/Failure/ToRollback.php +++ b/src/Failure/ToRollback.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToRollback extends Failure +final class ToRollback { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toRollback; diff --git a/src/Failure/ToSelect.php b/src/Failure/ToSelect.php index a6fe971..ac361af 100644 --- a/src/Failure/ToSelect.php +++ b/src/Failure/ToSelect.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToSelect extends Failure +final class ToSelect { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toSelect; diff --git a/src/Failure/ToSendFrame.php b/src/Failure/ToSendFrame.php index 1a0a6d9..26b078a 100644 --- a/src/Failure/ToSendFrame.php +++ b/src/Failure/ToSendFrame.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToSendFrame extends Failure +final class ToSendFrame { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toSendFrame; diff --git a/src/Failure/ToUnbind.php b/src/Failure/ToUnbind.php index f237f6c..2f9d9f0 100644 --- a/src/Failure/ToUnbind.php +++ b/src/Failure/ToUnbind.php @@ -4,14 +4,13 @@ namespace Innmind\AMQP\Failure; use Innmind\AMQP\{ - Failure, Model\Queue\Unbinding as Command, }; /** * @psalm-immutable */ -final class ToUnbind extends Failure +final class ToUnbind { private Command $command; @@ -29,7 +28,7 @@ public function command(): Command return $this->command; } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::toUnbind; diff --git a/src/Failure/UnexpectedFrame.php b/src/Failure/UnexpectedFrame.php index bc57cfc..6d3be9a 100644 --- a/src/Failure/UnexpectedFrame.php +++ b/src/Failure/UnexpectedFrame.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class UnexpectedFrame extends Failure +final class UnexpectedFrame { /** * @internal @@ -17,7 +15,7 @@ public function __construct() { } - #[\Override] + #[\NoDiscard] public function kind(): Kind { return Kind::unexpectedFrame; From a079ba2fd1a7be7457431a6a416b334f816eb619 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 17:45:15 +0200 Subject: [PATCH 02/17] hold the failure kind inside Failure directly --- src/Failure.php | 144 +++++++++++++++++++++++------- src/Failure/ClosedByServer.php | 10 +-- src/Failure/ClosedBySignal.php | 6 -- src/Failure/ToAck.php | 6 -- src/Failure/ToAdjustQos.php | 6 -- src/Failure/ToBind.php | 10 +-- src/Failure/ToCancel.php | 6 -- src/Failure/ToCloseChannel.php | 6 -- src/Failure/ToCloseConnection.php | 6 -- src/Failure/ToCommit.php | 6 -- src/Failure/ToDeclareExchange.php | 10 +-- src/Failure/ToDeclareQueue.php | 10 +-- src/Failure/ToDeleteExchange.php | 10 +-- src/Failure/ToDeleteQueue.php | 10 +-- src/Failure/ToGet.php | 10 +-- src/Failure/ToOpenChannel.php | 6 -- src/Failure/ToOpenConnection.php | 6 -- src/Failure/ToPublish.php | 10 +-- src/Failure/ToPurge.php | 10 +-- src/Failure/ToReadFrame.php | 6 -- src/Failure/ToReadMessage.php | 6 -- src/Failure/ToRecover.php | 6 -- src/Failure/ToReject.php | 6 -- src/Failure/ToRollback.php | 6 -- src/Failure/ToSelect.php | 6 -- src/Failure/ToSendFrame.php | 6 -- src/Failure/ToUnbind.php | 10 +-- src/Failure/UnexpectedFrame.php | 6 -- 28 files changed, 124 insertions(+), 222 deletions(-) diff --git a/src/Failure.php b/src/Failure.php index 0950252..eff8923 100644 --- a/src/Failure.php +++ b/src/Failure.php @@ -11,163 +11,242 @@ final class Failure extends Exception\RuntimeException { private function __construct( private object $failure, + private Failure\Kind $kind, ) { } #[\NoDiscard] public static function toOpenConnection(): self { - return new self(new Failure\ToOpenConnection); + return new self( + new Failure\ToOpenConnection, + Failure\Kind::toOpenConnection, + ); } #[\NoDiscard] public static function toOpenChannel(): self { - return new self(new Failure\ToOpenChannel); + return new self( + new Failure\ToOpenChannel, + Failure\Kind::toOpenChannel, + ); } #[\NoDiscard] public static function toCloseChannel(): self { - return new self(new Failure\ToCloseChannel); + return new self( + new Failure\ToCloseChannel, + Failure\Kind::toCloseChannel, + ); } #[\NoDiscard] public static function toCloseConnection(): self { - return new self(new Failure\ToCloseConnection); + return new self( + new Failure\ToCloseConnection, + Failure\Kind::toCloseConnection, + ); } #[\NoDiscard] public static function toDeleteQueue(Model\Queue\Deletion $command): self { - return new self(new Failure\ToDeleteQueue($command)); + return new self( + new Failure\ToDeleteQueue($command), + Failure\Kind::toDeleteQueue, + ); } #[\NoDiscard] public static function toDeleteExchange(Model\Exchange\Deletion $command): self { - return new self(new Failure\ToDeleteExchange($command)); + return new self( + new Failure\ToDeleteExchange($command), + Failure\Kind::toDeleteExchange, + ); } #[\NoDiscard] public static function toDeclareQueue(Model\Queue\Declaration $command): self { - return new self(new Failure\ToDeclareQueue($command)); + return new self( + new Failure\ToDeclareQueue($command), + Failure\Kind::toDeclareQueue, + ); } #[\NoDiscard] public static function toDeclareExchange(Model\Exchange\Declaration $command): self { - return new self(new Failure\ToDeclareExchange($command)); + return new self( + new Failure\ToDeclareExchange($command), + Failure\Kind::toDeclareExchange, + ); } #[\NoDiscard] public static function toBind(Model\Queue\Binding $command): self { - return new self(new Failure\ToBind($command)); + return new self( + new Failure\ToBind($command), + Failure\Kind::toBind, + ); } #[\NoDiscard] public static function toUnbind(Model\Queue\Unbinding $command): self { - return new self(new Failure\ToUnbind($command)); + return new self( + new Failure\ToUnbind($command), + Failure\Kind::toUnbind, + ); } #[\NoDiscard] public static function toPurge(Model\Queue\Purge $command): self { - return new self(new Failure\ToPurge($command)); + return new self( + new Failure\ToPurge($command), + Failure\Kind::toPurge, + ); } #[\NoDiscard] public static function toAdjustQos(): self { - return new self(new Failure\ToAdjustQos); + return new self( + new Failure\ToAdjustQos, + Failure\Kind::toAdjustQos, + ); } #[\NoDiscard] public static function toPublish(Model\Basic\Publish $command): self { - return new self(new Failure\ToPublish($command)); + return new self( + new Failure\ToPublish($command), + Failure\Kind::toPublish, + ); } #[\NoDiscard] public static function toGet(Model\Basic\Get $command): self { - return new self(new Failure\ToGet($command)); + return new self( + new Failure\ToGet($command), + Failure\Kind::toGet, + ); } #[\NoDiscard] public static function toConsume(Model\Basic\Consume $command): self { - return new self(new Failure\ToConsume($command)); + return new self( + new Failure\ToConsume($command), + Failure\Kind::toConsume, + ); } #[\NoDiscard] public static function toAck(string $queue): self { - return new self(new Failure\ToAck($queue)); + return new self( + new Failure\ToAck($queue), + Failure\Kind::toAck, + ); } #[\NoDiscard] public static function toReject(string $queue): self { - return new self(new Failure\ToReject($queue)); + return new self( + new Failure\ToReject($queue), + Failure\Kind::toReject, + ); } #[\NoDiscard] public static function toCancel(string $queue): self { - return new self(new Failure\ToCancel($queue)); + return new self( + new Failure\ToCancel($queue), + Failure\Kind::toCancel, + ); } #[\NoDiscard] public static function toRecover(string $queue): self { - return new self(new Failure\ToRecover($queue)); + return new self( + new Failure\ToRecover($queue), + Failure\Kind::toRecover, + ); } #[\NoDiscard] public static function toSelect(): self { - return new self(new Failure\ToSelect); + return new self( + new Failure\ToSelect, + Failure\Kind::toSelect, + ); } #[\NoDiscard] public static function toCommit(): self { - return new self(new Failure\ToCommit); + return new self( + new Failure\ToCommit, + Failure\Kind::toCommit, + ); } #[\NoDiscard] public static function toRollback(): self { - return new self(new Failure\ToRollback); + return new self( + new Failure\ToRollback, + Failure\Kind::toRollback, + ); } #[\NoDiscard] public static function toSendFrame(): self { - return new self(new Failure\ToSendFrame); + return new self( + new Failure\ToSendFrame, + Failure\Kind::toSendFrame, + ); } #[\NoDiscard] public static function toReadFrame(): self { - return new self(new Failure\ToReadFrame); + return new self( + new Failure\ToReadFrame, + Failure\Kind::toReadFrame, + ); } #[\NoDiscard] public static function toReadMessage(): self { - return new self(new Failure\ToReadMessage); + return new self( + new Failure\ToReadMessage, + Failure\Kind::toReadMessage, + ); } #[\NoDiscard] public static function unexpectedFrame(): self { - return new self(new Failure\UnexpectedFrame); + return new self( + new Failure\UnexpectedFrame, + Failure\Kind::unexpectedFrame, + ); } /** @@ -180,13 +259,19 @@ public static function closedByServer( int $code, Maybe $method, ): self { - return new self(new Failure\ClosedByServer($message, $code, $method)); + return new self( + new Failure\ClosedByServer($message, $code, $method), + Failure\Kind::closedByServer, + ); } #[\NoDiscard] public static function closedBySignal(Signal $signal): self { - return new self(new Failure\ClosedBySignal($signal)); + return new self( + new Failure\ClosedBySignal($signal), + Failure\Kind::closedBySignal, + ); } #[\NoDiscard] @@ -198,7 +283,6 @@ public function unwrap(): object #[\NoDiscard] public function kind(): Failure\Kind { - /** @psalm-suppress MixedMethodCall,MixedReturnStatement */ - return $this->failure->kind(); + return $this->kind; } } diff --git a/src/Failure/ClosedByServer.php b/src/Failure/ClosedByServer.php index d873af2..7fa8c03 100644 --- a/src/Failure/ClosedByServer.php +++ b/src/Failure/ClosedByServer.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Transport\Frame\Method, -}; +use Innmind\AMQP\Transport\Frame\Method; use Innmind\Immutable\Maybe; /** @@ -32,12 +30,6 @@ public function __construct(string $message, int $code, Maybe $method) $this->method = $method; } - #[\NoDiscard] - public function kind(): Kind - { - return Kind::closedByServer; - } - #[\NoDiscard] public function message(): string { diff --git a/src/Failure/ClosedBySignal.php b/src/Failure/ClosedBySignal.php index cc4f933..0bf97e2 100644 --- a/src/Failure/ClosedBySignal.php +++ b/src/Failure/ClosedBySignal.php @@ -20,12 +20,6 @@ public function __construct(Signal $signal) $this->signal = $signal; } - #[\NoDiscard] - public function kind(): Kind - { - return Kind::closedBySignal; - } - #[\NoDiscard] public function signal(): Signal { diff --git a/src/Failure/ToAck.php b/src/Failure/ToAck.php index e822d67..ec61c19 100644 --- a/src/Failure/ToAck.php +++ b/src/Failure/ToAck.php @@ -18,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toAck; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToAdjustQos.php b/src/Failure/ToAdjustQos.php index ed3bc9a..5feff22 100644 --- a/src/Failure/ToAdjustQos.php +++ b/src/Failure/ToAdjustQos.php @@ -14,10 +14,4 @@ final class ToAdjustQos public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toAdjustQos; - } } diff --git a/src/Failure/ToBind.php b/src/Failure/ToBind.php index ecffa64..334f9cf 100644 --- a/src/Failure/ToBind.php +++ b/src/Failure/ToBind.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Queue\Binding as Command, -}; +use Innmind\AMQP\Model\Queue\Binding as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toBind; - } } diff --git a/src/Failure/ToCancel.php b/src/Failure/ToCancel.php index f9c9ad9..a6230a1 100644 --- a/src/Failure/ToCancel.php +++ b/src/Failure/ToCancel.php @@ -18,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toCancel; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToCloseChannel.php b/src/Failure/ToCloseChannel.php index 8f7585f..1e31a36 100644 --- a/src/Failure/ToCloseChannel.php +++ b/src/Failure/ToCloseChannel.php @@ -14,10 +14,4 @@ final class ToCloseChannel public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toCloseChannel; - } } diff --git a/src/Failure/ToCloseConnection.php b/src/Failure/ToCloseConnection.php index 441db5d..a2c1c7d 100644 --- a/src/Failure/ToCloseConnection.php +++ b/src/Failure/ToCloseConnection.php @@ -14,10 +14,4 @@ final class ToCloseConnection public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toCloseConnection; - } } diff --git a/src/Failure/ToCommit.php b/src/Failure/ToCommit.php index f4dd01b..2cb20b9 100644 --- a/src/Failure/ToCommit.php +++ b/src/Failure/ToCommit.php @@ -14,10 +14,4 @@ final class ToCommit public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toCommit; - } } diff --git a/src/Failure/ToDeclareExchange.php b/src/Failure/ToDeclareExchange.php index eda19a5..624b2c1 100644 --- a/src/Failure/ToDeclareExchange.php +++ b/src/Failure/ToDeclareExchange.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Exchange\Declaration as Command, -}; +use Innmind\AMQP\Model\Exchange\Declaration as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toDeclareExchange; - } } diff --git a/src/Failure/ToDeclareQueue.php b/src/Failure/ToDeclareQueue.php index 818b110..7343f3c 100644 --- a/src/Failure/ToDeclareQueue.php +++ b/src/Failure/ToDeclareQueue.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Queue\Declaration as Command, -}; +use Innmind\AMQP\Model\Queue\Declaration as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toDeclareQueue; - } } diff --git a/src/Failure/ToDeleteExchange.php b/src/Failure/ToDeleteExchange.php index dc262d9..2bd5e17 100644 --- a/src/Failure/ToDeleteExchange.php +++ b/src/Failure/ToDeleteExchange.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Exchange\Deletion as Command, -}; +use Innmind\AMQP\Model\Exchange\Deletion as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toDeleteExchange; - } } diff --git a/src/Failure/ToDeleteQueue.php b/src/Failure/ToDeleteQueue.php index 0ddec3d..0699eea 100644 --- a/src/Failure/ToDeleteQueue.php +++ b/src/Failure/ToDeleteQueue.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Queue\Deletion as Command, -}; +use Innmind\AMQP\Model\Queue\Deletion as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toDeleteQueue; - } } diff --git a/src/Failure/ToGet.php b/src/Failure/ToGet.php index 9e91201..0042109 100644 --- a/src/Failure/ToGet.php +++ b/src/Failure/ToGet.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Basic\Get as Command, -}; +use Innmind\AMQP\Model\Basic\Get as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toGet; - } } diff --git a/src/Failure/ToOpenChannel.php b/src/Failure/ToOpenChannel.php index 91c6bec..9ae1c20 100644 --- a/src/Failure/ToOpenChannel.php +++ b/src/Failure/ToOpenChannel.php @@ -14,10 +14,4 @@ final class ToOpenChannel public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toOpenChannel; - } } diff --git a/src/Failure/ToOpenConnection.php b/src/Failure/ToOpenConnection.php index a7ad378..c5ddf14 100644 --- a/src/Failure/ToOpenConnection.php +++ b/src/Failure/ToOpenConnection.php @@ -14,10 +14,4 @@ final class ToOpenConnection public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toOpenConnection; - } } diff --git a/src/Failure/ToPublish.php b/src/Failure/ToPublish.php index 3d7c188..de2e15b 100644 --- a/src/Failure/ToPublish.php +++ b/src/Failure/ToPublish.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Basic\Publish as Command, -}; +use Innmind\AMQP\Model\Basic\Publish as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toPublish; - } } diff --git a/src/Failure/ToPurge.php b/src/Failure/ToPurge.php index 9a20526..df062b7 100644 --- a/src/Failure/ToPurge.php +++ b/src/Failure/ToPurge.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Queue\Purge as Command, -}; +use Innmind\AMQP\Model\Queue\Purge as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toPurge; - } } diff --git a/src/Failure/ToReadFrame.php b/src/Failure/ToReadFrame.php index aff9fdb..62f61d1 100644 --- a/src/Failure/ToReadFrame.php +++ b/src/Failure/ToReadFrame.php @@ -14,10 +14,4 @@ final class ToReadFrame public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toReadFrame; - } } diff --git a/src/Failure/ToReadMessage.php b/src/Failure/ToReadMessage.php index b1b13df..cef39cb 100644 --- a/src/Failure/ToReadMessage.php +++ b/src/Failure/ToReadMessage.php @@ -14,10 +14,4 @@ final class ToReadMessage public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toReadMessage; - } } diff --git a/src/Failure/ToRecover.php b/src/Failure/ToRecover.php index f23cf63..5b989b1 100644 --- a/src/Failure/ToRecover.php +++ b/src/Failure/ToRecover.php @@ -18,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toRecover; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToReject.php b/src/Failure/ToReject.php index 5497564..fe445a5 100644 --- a/src/Failure/ToReject.php +++ b/src/Failure/ToReject.php @@ -18,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toReject; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToRollback.php b/src/Failure/ToRollback.php index 072e068..a0c9305 100644 --- a/src/Failure/ToRollback.php +++ b/src/Failure/ToRollback.php @@ -14,10 +14,4 @@ final class ToRollback public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toRollback; - } } diff --git a/src/Failure/ToSelect.php b/src/Failure/ToSelect.php index ac361af..fa68a9a 100644 --- a/src/Failure/ToSelect.php +++ b/src/Failure/ToSelect.php @@ -14,10 +14,4 @@ final class ToSelect public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toSelect; - } } diff --git a/src/Failure/ToSendFrame.php b/src/Failure/ToSendFrame.php index 26b078a..f3f709d 100644 --- a/src/Failure/ToSendFrame.php +++ b/src/Failure/ToSendFrame.php @@ -14,10 +14,4 @@ final class ToSendFrame public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toSendFrame; - } } diff --git a/src/Failure/ToUnbind.php b/src/Failure/ToUnbind.php index 2f9d9f0..c05aee9 100644 --- a/src/Failure/ToUnbind.php +++ b/src/Failure/ToUnbind.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Queue\Unbinding as Command, -}; +use Innmind\AMQP\Model\Queue\Unbinding as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toUnbind; - } } diff --git a/src/Failure/UnexpectedFrame.php b/src/Failure/UnexpectedFrame.php index 6d3be9a..45f9caf 100644 --- a/src/Failure/UnexpectedFrame.php +++ b/src/Failure/UnexpectedFrame.php @@ -14,10 +14,4 @@ final class UnexpectedFrame public function __construct() { } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::unexpectedFrame; - } } From ca3720e47e6bc42396b56579fc77929ae4de7c65 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 17:52:31 +0200 Subject: [PATCH 03/17] make Client::run() return an Attempt to allow to add more error cases --- CHANGELOG.md | 1 + src/Client.php | 23 ++++++++++++----------- src/Transport/Connection.php | 6 +++--- src/Transport/Connection/Handshake.php | 7 ++++--- src/Transport/Connection/OpenVHost.php | 8 ++++---- src/Transport/Connection/Start.php | 8 ++++---- 6 files changed, 28 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 579c82f..d315b85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - `Innmind\AMQP\Factory::make()` timeout argument is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Failure` is now an exception that wraps each possible failure object +- `Innmind\AMQP\Client::run()` now returns an `Innmind\Immutable\Attempt` ### Fixed diff --git a/src/Client.php b/src/Client.php index 4c4b1b1..313c58b 100644 --- a/src/Client.php +++ b/src/Client.php @@ -15,6 +15,7 @@ Filesystem, }; use Innmind\Immutable\{ + Attempt, Either, Maybe, SideEffect, @@ -24,7 +25,7 @@ final class Client { /** @var Maybe */ private Maybe $command; - /** @var callable(): Maybe */ + /** @var callable(): Attempt */ private $load; private Filesystem $filesystem; /** @var Maybe */ @@ -32,7 +33,7 @@ final class Client /** * @param Maybe $command - * @param callable(): Maybe $load + * @param callable(): Attempt $load * @param Maybe $signals */ private function __construct( @@ -48,7 +49,7 @@ private function __construct( } /** - * @param callable(): Maybe $load + * @param callable(): Attempt $load */ #[\NoDiscard] public static function of(callable $load, Filesystem $filesystem): self @@ -96,10 +97,10 @@ public function listenSignals(CurrentProcess $currentProcess): self * * @param T $state * - * @return Either + * @return Attempt */ #[\NoDiscard] - public function run(mixed $state): Either + public function run(mixed $state): Attempt { return $this->command->match( fn($command) => $this @@ -112,23 +113,22 @@ public function run(mixed $state): Either fn($state) => $this ->close($connection, $channel) ->map(static fn(): mixed => $state->unwrap()), - ); + )->attempt(static fn($failure) => $failure); }), - static fn() => Either::right($state), + static fn() => Attempt::result($state), ); } /** - * @return Either + * @return Attempt */ - private function openChannel(): Either + private function openChannel(): Attempt { // Since the connection is never shared between objects then there is no // need to have a dynamic channel number as there will ALWAYS be one // channel per connection $channel = new Channel(1); - /** @var Either */ return ($this->load)() ->either() ->leftMap(static fn() => Failure::toOpenConnection()) @@ -147,7 +147,8 @@ private function openChannel(): Either )) ->map(static fn() => [$connection, $channel]) ->leftMap(static fn() => Failure::toOpenChannel()), - ); + ) + ->attempt(static fn($failure) => $failure); } /** diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php index 12673b9..ee78749 100644 --- a/src/Transport/Connection.php +++ b/src/Transport/Connection.php @@ -36,6 +36,7 @@ use Innmind\OperatingSystem\Remote; use Innmind\Immutable\{ Str, + Attempt, Maybe, Either, Sequence, @@ -80,7 +81,7 @@ private function __construct( } /** - * @return Maybe + * @return Attempt */ public static function open( Transport $transport, @@ -89,7 +90,7 @@ public static function open( Period $timeout, Clock $clock, Remote $remote, - ): Maybe { + ): Attempt { return $remote ->socket( $transport, @@ -114,7 +115,6 @@ public static function open( (new FrameReader)($protocol), SignalListener::uninstalled(), )) - ->maybe() ->flatMap(new Start($server->authority())) ->flatMap(new Handshake($server->authority())) ->flatMap(new OpenVHost($server->path())); diff --git a/src/Transport/Connection/Handshake.php b/src/Transport/Connection/Handshake.php index 7c654c1..f2828e8 100644 --- a/src/Transport/Connection/Handshake.php +++ b/src/Transport/Connection/Handshake.php @@ -16,6 +16,7 @@ use Innmind\TimeContinuum\Period; use Innmind\Url\Authority; use Innmind\Immutable\{ + Attempt, Maybe, Either, Predicate\Instance, @@ -34,9 +35,9 @@ public function __construct(Authority $authority) } /** - * @return Maybe + * @return Attempt */ - public function __invoke(Connection $connection): Maybe + public function __invoke(Connection $connection): Attempt { return $connection ->wait(Method::connectionSecure, Method::connectionTune) @@ -44,7 +45,7 @@ public function __invoke(Connection $connection): Maybe true => $this->secure($connection), false => $this->maybeTune($connection, $received->frame()), }) - ->maybe(); + ->attempt(static fn($failure) => $failure); } /** diff --git a/src/Transport/Connection/OpenVHost.php b/src/Transport/Connection/OpenVHost.php index 6f5c194..a550da6 100644 --- a/src/Transport/Connection/OpenVHost.php +++ b/src/Transport/Connection/OpenVHost.php @@ -9,7 +9,7 @@ Model\Connection\Open, }; use Innmind\Url\Path; -use Innmind\Immutable\Maybe; +use Innmind\Immutable\Attempt; /** * @internal @@ -24,9 +24,9 @@ public function __construct(Path $vhost) } /** - * @return Maybe + * @return Attempt */ - public function __invoke(Connection $connection): Maybe + public function __invoke(Connection $connection): Attempt { return $connection ->request( @@ -35,7 +35,7 @@ public function __invoke(Connection $connection): Maybe ), Method::connectionOpenOk, ) - ->maybe() + ->attempt(static fn($failure) => $failure) ->map(static fn() => $connection); } } diff --git a/src/Transport/Connection/Start.php b/src/Transport/Connection/Start.php index 67a5ae0..289310a 100644 --- a/src/Transport/Connection/Start.php +++ b/src/Transport/Connection/Start.php @@ -9,7 +9,7 @@ Model\Connection\StartOk, }; use Innmind\Url\Authority; -use Innmind\Immutable\Maybe; +use Innmind\Immutable\Attempt; /** * @internal @@ -24,9 +24,9 @@ public function __construct(Authority $authority) } /** - * @return Maybe + * @return Attempt */ - public function __invoke(Connection $connection): Maybe + public function __invoke(Connection $connection): Attempt { // at this point the server could respond with a simple text "AMQP0xyz" // where xyz represent the version of the protocol it supports meaning @@ -43,7 +43,7 @@ public function __invoke(Connection $connection): Maybe ), ), ) - ->maybe() + ->attempt(static fn($failure) => $failure) ->map(static fn() => $connection); } } From 16b7f0f8c1107878cb88c4154afbe27c4072c45b Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 18:13:08 +0200 Subject: [PATCH 04/17] make Commands return an Attempt --- CHANGELOG.md | 1 + src/Client.php | 13 +++--- src/Command.php | 6 +-- src/Command/Bind.php | 6 +-- src/Command/Consume.php | 70 ++++++++++++++++--------------- src/Command/DeclareExchange.php | 6 +-- src/Command/DeclareQueue.php | 6 +-- src/Command/DeleteExchange.php | 6 +-- src/Command/DeleteQueue.php | 6 +-- src/Command/Get.php | 73 +++++++++++++++------------------ src/Command/Pipe.php | 4 +- src/Command/Publish.php | 23 +++++------ src/Command/Purge.php | 6 +-- src/Command/Qos.php | 6 +-- src/Command/Transaction.php | 26 ++++++------ src/Command/Unbind.php | 6 +-- 16 files changed, 129 insertions(+), 135 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d315b85..6f29137 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Failure` is now an exception that wraps each possible failure object - `Innmind\AMQP\Client::run()` now returns an `Innmind\Immutable\Attempt` +- `Innmind\AMQP\Command::__invoke()` now must return an `Innmind\Immutable\Attempt` ### Fixed diff --git a/src/Client.php b/src/Client.php index 313c58b..a52c661 100644 --- a/src/Client.php +++ b/src/Client.php @@ -16,7 +16,6 @@ }; use Innmind\Immutable\{ Attempt, - Either, Maybe, SideEffect, }; @@ -113,7 +112,7 @@ public function run(mixed $state): Attempt fn($state) => $this ->close($connection, $channel) ->map(static fn(): mixed => $state->unwrap()), - )->attempt(static fn($failure) => $failure); + ); }), static fn() => Attempt::result($state), ); @@ -152,11 +151,10 @@ private function openChannel(): Attempt } /** - * @return Either + * @return Attempt */ - private function close(Connection $connection, Channel $channel): Either + private function close(Connection $connection, Channel $channel): Attempt { - /** @var Either */ return $connection ->request( static fn($protocol) => $protocol->channel()->close( @@ -165,12 +163,11 @@ private function close(Connection $connection, Channel $channel): Either ), Method::channelCloseOk, ) - ->leftMap(static fn() => Failure::toCloseChannel()) + ->attempt(static fn() => Failure::toCloseChannel()) ->flatMap( static fn() => $connection ->close() - ->either() - ->leftMap(static fn() => Failure::toCloseConnection()), + ->attempt(static fn() => Failure::toCloseConnection()), ); } } diff --git a/src/Command.php b/src/Command.php index 0367caf..456b2bd 100644 --- a/src/Command.php +++ b/src/Command.php @@ -8,12 +8,12 @@ Frame\Channel, Connection\MessageReader, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; interface Command { /** - * @return Either + * @return Attempt */ #[\NoDiscard] public function __invoke( @@ -21,5 +21,5 @@ public function __invoke( Channel $channel, MessageReader $read, Client\State $state, - ): Either; + ): Attempt; } diff --git a/src/Command/Bind.php b/src/Command/Bind.php index 8e9af28..e624fab 100644 --- a/src/Command/Bind.php +++ b/src/Command/Bind.php @@ -15,7 +15,7 @@ Model\Queue\Binding, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, }; @@ -34,7 +34,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->bind( $channel, $this->command, @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toBind($this->command)); + ->attempt(fn() => Failure::toBind($this->command)); } #[\NoDiscard] diff --git a/src/Command/Consume.php b/src/Command/Consume.php index 8230c95..d338463 100644 --- a/src/Command/Consume.php +++ b/src/Command/Consume.php @@ -22,7 +22,7 @@ }; use Innmind\Immutable\{ Maybe, - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -48,7 +48,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->basic()->consume( $channel, $this->command, @@ -57,6 +57,7 @@ public function __invoke( $sideEffect = match ($this->command->shouldWait()) { true => $connection ->request($frames, Method::basicConsumeOk) + ->attempt(static fn($failure) => $failure) ->flatMap(fn($frame) => $this->maybeStart( $connection, $channel, @@ -66,11 +67,12 @@ public function __invoke( )), false => $connection ->send($frames) - ->map(static fn() => $state), + ->map(static fn() => $state) + ->attempt(static fn($failure) => $failure), }; - return $sideEffect->leftMap( - fn() => Failure::toConsume($this->command), + return $sideEffect->recover( + fn() => Attempt::error(Failure::toConsume($this->command)), ); } @@ -93,7 +95,7 @@ public function handle(callable $consume): self } /** - * @return Either + * @return Attempt */ private function maybeStart( Connection $connection, @@ -101,14 +103,14 @@ private function maybeStart( MessageReader $read, Frame $frame, State $state, - ): Either { + ): Attempt { return $frame ->values() ->first() ->keep(Instance::of(Value\ShortString::class)) ->map(static fn($value) => $value->original()->toString()) ->either() - ->leftMap(fn() => Failure::toConsume($this->command)) + ->attempt(fn() => Failure::toConsume($this->command)) ->flatMap(fn($consumerTag) => $this->start( $connection, $channel, @@ -119,7 +121,7 @@ private function maybeStart( } /** - * @return Either + * @return Attempt */ private function start( Connection $connection, @@ -127,9 +129,9 @@ private function start( MessageReader $read, State $state, string $consumerTag, - ): Either { - /** @var Either */ - $consumed = Either::right($state); + ): Attempt { + /** @var Attempt */ + $consumed = Attempt::result($state); // here the best approach would be to use recursion to avoid unwrapping // the monads but it would end up with a too deep call stack for inifite // consumers as each new message would mean a new function call in the @@ -160,7 +162,7 @@ private function start( } /** - * @return Either + * @return Attempt */ private function waitDeliver( Connection $connection, @@ -168,28 +170,30 @@ private function waitDeliver( State $state, string $consumerTag, MessageReader $read, - ): Either { - /** @var Either */ + ): Attempt { return $connection ->wait(Method::basicDeliver) + ->attempt(static fn($failure) => $failure) ->flatMap( - fn($received) => $read($connection)->flatMap( - fn($message) => $this->maybeConsume( - $connection, - $channel, - $read, - $state, - $consumerTag, - $received->frame(), - $message, + fn($received) => $read($connection) + ->attempt(static fn($failure) => $failure) + ->flatMap( + fn($message) => $this->maybeConsume( + $connection, + $channel, + $read, + $state, + $consumerTag, + $received->frame(), + $message, + ), ), - ), ) - ->leftMap(fn() => Failure::toConsume($this->command)); + ->recover(fn() => Attempt::error(Failure::toConsume($this->command))); } /** - * @return Either + * @return Attempt */ private function maybeConsume( Connection $connection, @@ -199,7 +203,7 @@ private function maybeConsume( string $consumerTag, Frame $frame, Message $message, - ): Either { + ): Attempt { $destinationConsumerTag = $frame ->values() ->first() @@ -232,8 +236,7 @@ private function maybeConsume( ->flatMap(static fn($details) => $destinationConsumerTag->map( static fn() => $details, // this manipulation is to make sure the consumerTag is indeed for this consumer )) - ->either() - ->leftMap(fn() => Failure::toConsume($this->command)) + ->attempt(fn() => Failure::toConsume($this->command)) ->flatMap(fn($details) => $this->consume( $connection, $channel, @@ -246,7 +249,7 @@ private function maybeConsume( } /** - * @return Either + * @return Attempt */ private function consume( Connection $connection, @@ -256,7 +259,7 @@ private function consume( Details $details, Message $message, string $consumerTag, - ): Either { + ): Attempt { return ($this->consume)( $state->unwrap(), $message, @@ -270,6 +273,7 @@ private function consume( $read, $details->deliveryTag(), $consumerTag, - ); + ) + ->attempt(static fn($failure) => $failure); } } diff --git a/src/Command/DeclareExchange.php b/src/Command/DeclareExchange.php index d750cf4..f6e43b4 100644 --- a/src/Command/DeclareExchange.php +++ b/src/Command/DeclareExchange.php @@ -16,7 +16,7 @@ Failure, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, }; @@ -35,7 +35,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->exchange()->declare( $channel, $this->command, @@ -48,7 +48,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeclareExchange($this->command)); + ->attempt(fn() => Failure::toDeclareExchange($this->command)); } #[\NoDiscard] diff --git a/src/Command/DeclareQueue.php b/src/Command/DeclareQueue.php index 6da7e3a..517dad8 100644 --- a/src/Command/DeclareQueue.php +++ b/src/Command/DeclareQueue.php @@ -19,7 +19,7 @@ }; use Innmind\Immutable\{ Maybe, - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -39,7 +39,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->declare( $channel, $this->command, @@ -78,7 +78,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeclareQueue($this->command)); + ->attempt(fn() => Failure::toDeclareQueue($this->command)); } #[\NoDiscard] diff --git a/src/Command/DeleteExchange.php b/src/Command/DeleteExchange.php index 57e14bb..e45122a 100644 --- a/src/Command/DeleteExchange.php +++ b/src/Command/DeleteExchange.php @@ -15,7 +15,7 @@ Failure, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, }; @@ -34,7 +34,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->exchange()->delete( $channel, $this->command, @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeleteExchange($this->command)); + ->attempt(fn() => Failure::toDeleteExchange($this->command)); } #[\NoDiscard] diff --git a/src/Command/DeleteQueue.php b/src/Command/DeleteQueue.php index ef5e7b7..2d1fdce 100644 --- a/src/Command/DeleteQueue.php +++ b/src/Command/DeleteQueue.php @@ -18,7 +18,7 @@ Failure, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -38,7 +38,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->delete( $channel, $this->command, @@ -64,7 +64,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeleteQueue($this->command)); + ->attempt(fn() => Failure::toDeleteQueue($this->command)); } #[\NoDiscard] diff --git a/src/Command/Get.php b/src/Command/Get.php index cd91205..ab554f4 100644 --- a/src/Command/Get.php +++ b/src/Command/Get.php @@ -22,7 +22,7 @@ }; use Innmind\Immutable\{ Maybe, - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -52,22 +52,15 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { - /** - * @psalm-suppress MixedArgumentTypeCoercion - * @var Either - */ - return Sequence::of(...\array_fill(0, $this->take, null))->reduce( - Either::right($state), - fn(Either $state) => $state->flatMap( - fn(State $state) => $this->doGet( - $connection, - $channel, - $read, - $state, - ), - ), - ); + ): Attempt { + return Sequence::of(...\array_fill(0, $this->take, null)) + ->sink($state) + ->attempt(fn($state) => $this->doGet( + $connection, + $channel, + $read, + $state, + )); } #[\NoDiscard] @@ -99,15 +92,14 @@ public function take(int $take): self } /** - * @return Either + * @return Attempt */ public function doGet( Connection $connection, Channel $channel, MessageReader $read, State $state, - ): Either { - /** @var Either */ + ): Attempt { return $connection ->request( fn($protocol) => $protocol->basic()->get( @@ -117,6 +109,7 @@ public function doGet( Method::basicGetOk, Method::basicGetEmpty, ) + ->attempt(static fn($failure) => $failure) ->flatMap( fn($frame) => $this->maybeConsume( $connection, @@ -126,11 +119,11 @@ public function doGet( $state, ), ) - ->leftMap(fn() => Failure::toGet($this->command)); + ->recover(fn() => Attempt::error(Failure::toGet($this->command))); } /** - * @return Either + * @return Attempt */ private function maybeConsume( Connection $connection, @@ -138,10 +131,9 @@ private function maybeConsume( MessageReader $read, Frame $frame, State $state, - ): Either { + ): Attempt { if ($frame->is(Method::basicGetEmpty)) { - /** @var Either */ - return Either::right($state); + return Attempt::result($state); } $deliveryTag = $frame @@ -171,27 +163,27 @@ private function maybeConsume( ->map(static fn($value) => $value->original()) ->map(Count::of(...)); - /** @var Either */ return Maybe::all($deliveryTag, $redelivered, $exchange, $routingKey, $messageCount) ->map(Details::ofGet(...)) - ->either() - ->leftMap(fn() => Failure::toGet($this->command)) + ->attempt(fn() => Failure::toGet($this->command)) ->flatMap( - fn($details) => $read($connection)->flatMap( - fn($message) => $this->consume( - $connection, - $channel, - $read, - $state, - $message, - $details, + fn($details) => $read($connection) + ->attempt(static fn($failure) => $failure) + ->flatMap( + fn($message) => $this->consume( + $connection, + $channel, + $read, + $state, + $message, + $details, + ), ), - ), ); } /** - * @return Either + * @return Attempt */ private function consume( Connection $connection, @@ -200,7 +192,7 @@ private function consume( State $state, Message $message, Details $details, - ): Either { + ): Attempt { return ($this->consume)( $state->unwrap(), $message, @@ -217,6 +209,7 @@ private function consume( ->map(static fn($state) => match (true) { $state instanceof Canceled => $state->state(), default => $state, - }); + }) + ->attempt(static fn($failure) => $failure); } } diff --git a/src/Command/Pipe.php b/src/Command/Pipe.php index d4360cc..a13b272 100644 --- a/src/Command/Pipe.php +++ b/src/Command/Pipe.php @@ -10,7 +10,7 @@ Transport\Frame\Channel, Client\State, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; /** * @internal @@ -32,7 +32,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return ($this->first)($connection, $channel, $read, $state)->flatMap( fn($state) => ($this->second)( $connection, diff --git a/src/Command/Publish.php b/src/Command/Publish.php index 39c128a..e0c4a15 100644 --- a/src/Command/Publish.php +++ b/src/Command/Publish.php @@ -14,7 +14,7 @@ Model\Basic\Message, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, SideEffect, }; @@ -38,16 +38,15 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { - /** @var Either */ + ): Attempt { return $this ->commands - ->reduce( - Either::right(new SideEffect), - fn(Either $state, $command) => $state->flatMap( - fn() => $this->publish($connection, $channel, $command), - ), - ) + ->sink(SideEffect::identity()) + ->attempt(fn($_, $command) => $this->publish( + $connection, + $channel, + $command, + )) ->map(static fn() => $state); } @@ -83,19 +82,19 @@ public function withRoutingKey(string $routingKey): self } /** - * @return Either + * @return Attempt */ private function publish( Connection $connection, Channel $channel, Model $command, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol, $maxFrameSize) => $protocol->basic()->publish( $channel, $command, $maxFrameSize, )) - ->leftMap(static fn() => Failure::toPublish($command)); + ->attempt(static fn() => Failure::toPublish($command)); } } diff --git a/src/Command/Purge.php b/src/Command/Purge.php index 89ddad2..975bfb7 100644 --- a/src/Command/Purge.php +++ b/src/Command/Purge.php @@ -18,7 +18,7 @@ Model\Count, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -38,7 +38,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->purge( $channel, $this->command, @@ -64,7 +64,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toPurge($this->command)); + ->attempt(fn() => Failure::toPurge($this->command)); } #[\NoDiscard] diff --git a/src/Command/Qos.php b/src/Command/Qos.php index 4959c99..1d932b0 100644 --- a/src/Command/Qos.php +++ b/src/Command/Qos.php @@ -13,7 +13,7 @@ Transport\Frame\Method, Model\Basic\Qos as Model, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; final class Qos implements Command { @@ -30,7 +30,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return $connection ->request( fn($protocol) => $protocol->basic()->qos( @@ -40,7 +40,7 @@ public function __invoke( Method::basicQosOk, ) ->map(static fn() => $state) - ->leftMap(static fn() => Failure::toAdjustQos()); + ->attempt(static fn() => Failure::toAdjustQos()); } /** diff --git a/src/Command/Transaction.php b/src/Command/Transaction.php index 1d0524d..f7227d4 100644 --- a/src/Command/Transaction.php +++ b/src/Command/Transaction.php @@ -12,7 +12,7 @@ Transport\Frame\Channel, Transport\Frame\Method, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; final class Transaction implements Command { @@ -35,7 +35,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return $this ->select($connection, $channel) ->flatMap(fn($connection) => ($this->command)( @@ -72,29 +72,29 @@ public function with(Command $command): self } /** - * @return Either + * @return Attempt */ private function select( Connection $connection, Channel $channel, - ): Either { + ): Attempt { return $connection ->request( static fn($protocol) => $protocol->transaction()->select($channel), Method::transactionSelectOk, ) ->map(static fn() => $connection) - ->leftMap(static fn() => Failure::toSelect()); + ->attempt(static fn() => Failure::toSelect()); } /** - * @return Either + * @return Attempt */ private function finish( Connection $connection, Channel $channel, State $state, - ): Either { + ): Attempt { return match (($this->predicate)($state->unwrap())) { true => $this->commit($connection, $channel, $state), false => $this->rollback($connection, $channel, $state), @@ -102,36 +102,36 @@ private function finish( } /** - * @return Either + * @return Attempt */ private function commit( Connection $connection, Channel $channel, State $state, - ): Either { + ): Attempt { return $connection ->request( static fn($protocol) => $protocol->transaction()->commit($channel), Method::transactionCommitOk, ) ->map(static fn() => $state) - ->leftMap(static fn() => Failure::toCommit()); + ->attempt(static fn() => Failure::toCommit()); } /** - * @return Either + * @return Attempt */ private function rollback( Connection $connection, Channel $channel, State $state, - ): Either { + ): Attempt { return $connection ->request( static fn($protocol) => $protocol->transaction()->rollback($channel), Method::transactionRollbackOk, ) ->map(static fn() => $state) - ->leftMap(static fn() => Failure::toRollback()); + ->attempt(static fn() => Failure::toRollback()); } } diff --git a/src/Command/Unbind.php b/src/Command/Unbind.php index 7e82087..4f3a478 100644 --- a/src/Command/Unbind.php +++ b/src/Command/Unbind.php @@ -13,7 +13,7 @@ Transport\Frame\Method, Model\Queue\Unbinding, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; final class Unbind implements Command { @@ -30,7 +30,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return $connection ->request( fn($protocol) => $protocol->queue()->unbind( @@ -40,7 +40,7 @@ public function __invoke( Method::queueUnbindOk, ) ->map(static fn() => $state) - ->leftMap(fn() => Failure::toUnbind($this->command)); + ->attempt(fn() => Failure::toUnbind($this->command)); } #[\NoDiscard] From 302377dd7054a26d783e4051985fd0169fbce26b Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 18:43:39 +0200 Subject: [PATCH 05/17] use Attempt as return type for all Connection methods --- src/Client.php | 12 ++-- src/Command/Bind.php | 2 +- src/Command/Consume.php | 5 +- src/Command/DeclareExchange.php | 2 +- src/Command/DeclareQueue.php | 4 +- src/Command/DeleteExchange.php | 2 +- src/Command/DeleteQueue.php | 4 +- src/Command/Get.php | 1 - src/Command/Publish.php | 2 +- src/Command/Purge.php | 4 +- src/Command/Qos.php | 2 +- src/Command/Transaction.php | 6 +- src/Command/Unbind.php | 2 +- src/Consumer/Continuation.php | 13 ++++- src/Transport/Connection.php | 65 +++++++++------------ src/Transport/Connection/Handshake.php | 25 ++++---- src/Transport/Connection/MessageReader.php | 2 + src/Transport/Connection/OpenVHost.php | 1 - src/Transport/Connection/SignalListener.php | 17 +++--- src/Transport/Connection/Start.php | 1 - 20 files changed, 85 insertions(+), 87 deletions(-) diff --git a/src/Client.php b/src/Client.php index a52c661..4ee00ff 100644 --- a/src/Client.php +++ b/src/Client.php @@ -129,8 +129,7 @@ private function openChannel(): Attempt $channel = new Channel(1); return ($this->load)() - ->either() - ->leftMap(static fn() => Failure::toOpenConnection()) + ->recover(static fn() => Attempt::error(Failure::toOpenConnection())) ->flatMap( fn($connection) => $connection ->request( @@ -145,9 +144,8 @@ private function openChannel(): Attempt static fn() => null, )) ->map(static fn() => [$connection, $channel]) - ->leftMap(static fn() => Failure::toOpenChannel()), - ) - ->attempt(static fn($failure) => $failure); + ->recover(static fn() => Attempt::error(Failure::toOpenChannel())), + ); } /** @@ -163,11 +161,11 @@ private function close(Connection $connection, Channel $channel): Attempt ), Method::channelCloseOk, ) - ->attempt(static fn() => Failure::toCloseChannel()) + ->recover(static fn() => Attempt::error(Failure::toCloseChannel())) ->flatMap( static fn() => $connection ->close() - ->attempt(static fn() => Failure::toCloseConnection()), + ->recover(static fn() => Attempt::error(Failure::toCloseConnection())), ); } } diff --git a/src/Command/Bind.php b/src/Command/Bind.php index e624fab..0ce074e 100644 --- a/src/Command/Bind.php +++ b/src/Command/Bind.php @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->attempt(fn() => Failure::toBind($this->command)); + ->recover(fn() => Attempt::error(Failure::toBind($this->command))); } #[\NoDiscard] diff --git a/src/Command/Consume.php b/src/Command/Consume.php index d338463..eba7a6b 100644 --- a/src/Command/Consume.php +++ b/src/Command/Consume.php @@ -57,7 +57,6 @@ public function __invoke( $sideEffect = match ($this->command->shouldWait()) { true => $connection ->request($frames, Method::basicConsumeOk) - ->attempt(static fn($failure) => $failure) ->flatMap(fn($frame) => $this->maybeStart( $connection, $channel, @@ -67,8 +66,7 @@ public function __invoke( )), false => $connection ->send($frames) - ->map(static fn() => $state) - ->attempt(static fn($failure) => $failure), + ->map(static fn() => $state), }; return $sideEffect->recover( @@ -173,7 +171,6 @@ private function waitDeliver( ): Attempt { return $connection ->wait(Method::basicDeliver) - ->attempt(static fn($failure) => $failure) ->flatMap( fn($received) => $read($connection) ->attempt(static fn($failure) => $failure) diff --git a/src/Command/DeclareExchange.php b/src/Command/DeclareExchange.php index f6e43b4..2dd2089 100644 --- a/src/Command/DeclareExchange.php +++ b/src/Command/DeclareExchange.php @@ -48,7 +48,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->attempt(fn() => Failure::toDeclareExchange($this->command)); + ->recover(fn() => Attempt::error(Failure::toDeclareExchange($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeclareQueue.php b/src/Command/DeclareQueue.php index 517dad8..bd3122a 100644 --- a/src/Command/DeclareQueue.php +++ b/src/Command/DeclareQueue.php @@ -71,14 +71,14 @@ public function __invoke( // maybe in the future we could expose this info to the user return Maybe::all($name, $message, $consumer) ->map(DeclareOk::of(...)) - ->either(); + ->attempt(static fn() => new \RuntimeException('Invalid declare.ok response')); }), false => $connection->send($frames), }; return $sideEffect ->map(static fn() => $state) - ->attempt(fn() => Failure::toDeclareQueue($this->command)); + ->recover(fn() => Attempt::error(Failure::toDeclareQueue($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeleteExchange.php b/src/Command/DeleteExchange.php index e45122a..ab8cd95 100644 --- a/src/Command/DeleteExchange.php +++ b/src/Command/DeleteExchange.php @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->attempt(fn() => Failure::toDeleteExchange($this->command)); + ->recover(fn() => Attempt::error(Failure::toDeleteExchange($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeleteQueue.php b/src/Command/DeleteQueue.php index 2d1fdce..92cccae 100644 --- a/src/Command/DeleteQueue.php +++ b/src/Command/DeleteQueue.php @@ -57,14 +57,14 @@ public function __invoke( ->map(static fn($value) => $value->original()) ->map(Count::of(...)) ->map(DeleteOk::of(...)) - ->either(), + ->attempt(static fn() => new \RuntimeException('Unable to find message count')), ), false => $connection->send($frames), }; return $sideEffect ->map(static fn() => $state) - ->attempt(fn() => Failure::toDeleteQueue($this->command)); + ->recover(fn() => Attempt::error(Failure::toDeleteQueue($this->command))); } #[\NoDiscard] diff --git a/src/Command/Get.php b/src/Command/Get.php index ab554f4..ff5a476 100644 --- a/src/Command/Get.php +++ b/src/Command/Get.php @@ -109,7 +109,6 @@ public function doGet( Method::basicGetOk, Method::basicGetEmpty, ) - ->attempt(static fn($failure) => $failure) ->flatMap( fn($frame) => $this->maybeConsume( $connection, diff --git a/src/Command/Publish.php b/src/Command/Publish.php index e0c4a15..0bca3a7 100644 --- a/src/Command/Publish.php +++ b/src/Command/Publish.php @@ -95,6 +95,6 @@ private function publish( $command, $maxFrameSize, )) - ->attempt(static fn() => Failure::toPublish($command)); + ->recover(static fn() => Attempt::error(Failure::toPublish($command))); } } diff --git a/src/Command/Purge.php b/src/Command/Purge.php index 975bfb7..0cf5f41 100644 --- a/src/Command/Purge.php +++ b/src/Command/Purge.php @@ -57,14 +57,14 @@ public function __invoke( ->map(static fn($value) => $value->original()) ->map(Count::of(...)) ->map(PurgeOk::of(...)) - ->either(), + ->attempt(static fn() => new \RuntimeException('Unable to find message count')), ), false => $connection->send($frames), }; return $sideEffect ->map(static fn() => $state) - ->attempt(fn() => Failure::toPurge($this->command)); + ->recover(fn() => Attempt::error(Failure::toPurge($this->command))); } #[\NoDiscard] diff --git a/src/Command/Qos.php b/src/Command/Qos.php index 1d932b0..f661900 100644 --- a/src/Command/Qos.php +++ b/src/Command/Qos.php @@ -40,7 +40,7 @@ public function __invoke( Method::basicQosOk, ) ->map(static fn() => $state) - ->attempt(static fn() => Failure::toAdjustQos()); + ->recover(static fn() => Attempt::error(Failure::toAdjustQos())); } /** diff --git a/src/Command/Transaction.php b/src/Command/Transaction.php index f7227d4..391398c 100644 --- a/src/Command/Transaction.php +++ b/src/Command/Transaction.php @@ -84,7 +84,7 @@ private function select( Method::transactionSelectOk, ) ->map(static fn() => $connection) - ->attempt(static fn() => Failure::toSelect()); + ->recover(static fn() => Attempt::error(Failure::toSelect())); } /** @@ -115,7 +115,7 @@ private function commit( Method::transactionCommitOk, ) ->map(static fn() => $state) - ->attempt(static fn() => Failure::toCommit()); + ->recover(static fn() => Attempt::error(Failure::toCommit())); } /** @@ -132,6 +132,6 @@ private function rollback( Method::transactionRollbackOk, ) ->map(static fn() => $state) - ->attempt(static fn() => Failure::toRollback()); + ->recover(static fn() => Attempt::error(Failure::toRollback())); } } diff --git a/src/Command/Unbind.php b/src/Command/Unbind.php index 4f3a478..2d48f6c 100644 --- a/src/Command/Unbind.php +++ b/src/Command/Unbind.php @@ -40,7 +40,7 @@ public function __invoke( Method::queueUnbindOk, ) ->map(static fn() => $state) - ->attempt(fn() => Failure::toUnbind($this->command)); + ->recover(fn() => Attempt::error(Failure::toUnbind($this->command))); } #[\NoDiscard] diff --git a/src/Consumer/Continuation.php b/src/Consumer/Continuation.php index 8cd1453..386ef67 100644 --- a/src/Consumer/Continuation.php +++ b/src/Consumer/Continuation.php @@ -125,9 +125,11 @@ private function recover( // read all the frames for the prefetched message then wait for next // frame $received = $received->flatMap( - static fn() => $read($connection)->flatMap( - static fn() => $connection->wait(), - ), + static fn() => $read($connection) + ->attempt(static fn($failure) => $failure) + ->flatMap( + static fn() => $connection->wait(), + ), ); $walkOverPrefetchedMessages = $received->match( static fn($received) => $received->is(Method::basicDeliver), @@ -151,6 +153,7 @@ private function recover( ), Method::basicRecoverOk, )) + ->either() ->map(fn() => Canceled::of($this->state)) ->leftMap(static fn() => Failure::toRecover($queue)); } @@ -171,6 +174,7 @@ private function doAck( $channel, Ack::of($deliveryTag), )) + ->either() ->leftMap(static fn() => Failure::toAck($queue)); } @@ -190,6 +194,7 @@ private function doReject( $channel, Reject::of($deliveryTag), )) + ->either() ->leftMap(static fn() => Failure::toReject($queue)); } @@ -209,6 +214,7 @@ private function doRequeue( $channel, Reject::requeue($deliveryTag), )) + ->either() ->leftMap(static fn() => Failure::toReject($queue)); } @@ -231,6 +237,7 @@ private function doCancel( $channel, Cancel::of($consumerTag), )) + ->either() ->leftMap(static fn() => Failure::toCancel($queue)); } } diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php index ee78749..cc1fbaa 100644 --- a/src/Transport/Connection.php +++ b/src/Transport/Connection.php @@ -38,7 +38,6 @@ Str, Attempt, Maybe, - Either, Sequence, SideEffect, Predicate\Instance, @@ -123,9 +122,9 @@ public static function open( /** * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @return Either + * @return Attempt */ - public function respondTo(Method $method, callable $frames): Either + public function respondTo(Method $method, callable $frames): Attempt { return $this ->wait($method) @@ -137,9 +136,9 @@ public function respondTo(Method $method, callable $frames): Either /** * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @return Either + * @return Attempt */ - public function request(callable $frames, Method $method, Method ...$methods): Either + public function request(callable $frames, Method $method, Method ...$methods): Attempt { return $this ->sendFrames($frames) @@ -150,17 +149,17 @@ public function request(callable $frames, Method $method, Method ...$methods): E /** * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @return Either + * @return Attempt */ - public function send(callable $frames): Either + public function send(callable $frames): Attempt { return $this->sendFrames($frames); } /** - * @return Either + * @return Attempt */ - public function wait(Method ...$names): Either + public function wait(Method ...$names): Attempt { return $this ->socket @@ -175,7 +174,6 @@ public function wait(Method ...$names): Either ->one() ->map(ReceivedFrame::of(...)) ->map($this->flagActive(...)) - ->either() ->eitherWay( fn($received) => match ($received->frame()->type()) { Type::heartbeat => $this->wait(...$names), @@ -183,15 +181,15 @@ public function wait(Method ...$names): Either }, fn() => $this->signals->close( $this, - static fn() => Either::left(Failure::toReadFrame()), + static fn() => Attempt::error(Failure::toReadFrame()), ), ); } /** - * @return Maybe + * @return Attempt */ - public function close(): Maybe + public function close(): Attempt { $this->signals->uninstall(); @@ -200,19 +198,18 @@ public function close(): Maybe static fn($protocol) => $protocol->connection()->close(Close::demand()), Method::connectionCloseOk, ) - ->maybe() - ->flatMap(fn() => $this->socket->close()->maybe()) + ->flatMap(fn() => $this->socket->close()) ->map(static fn() => new SideEffect); } /** - * @return Maybe + * @return Attempt */ public function tune( MaxChannels $maxChannels, MaxFrameSize $maxFrameSize, Period $heartbeat, - ): Maybe { + ): Attempt { return $this ->send(static fn($protocol) => $protocol->connection()->tuneOk( TuneOk::of( @@ -221,7 +218,6 @@ public function tune( $heartbeat, ), )) - ->maybe() ->map(fn() => new self( $this->protocol, $this->heartbeat->adjust($heartbeat), @@ -257,9 +253,9 @@ private function flagActive(ReceivedFrame $received): ReceivedFrame * @throws FrameChannelExceedAllowedChannelNumber * @throws FrameExceedAllowedSize * - * @return Either + * @return Attempt */ - private function sendFrames(callable $frames): Either + private function sendFrames(callable $frames): Attempt { $data = $frames($this->protocol, $this->maxFrameSize) ->map(function($frame) { @@ -278,44 +274,41 @@ private function sendFrames(callable $frames): Either ->socket ->abortWhen($this->signals->notified(...)) ->sink($data) - ->either() ->eitherWay( - static fn() => Either::right(new SideEffect), + static fn() => Attempt::result(new SideEffect), fn() => $this->signals->close( $this, - static fn() => Either::left(Failure::toSendFrame()), + static fn() => Attempt::error(Failure::toSendFrame()), ), ); } /** - * @return Either + * @return Attempt */ private function ensureValidFrame( ReceivedFrame $received, Method ...$names, - ): Either { + ): Attempt { if (\count($names) === 0) { - /** @var Either */ - return Either::right($received); + return Attempt::result($received); } if ($received->frame()->type() !== Type::method) { // someone must have forgot a wait() call - /** @var Either */ - return Either::left(Failure::unexpectedFrame()); + /** @var Attempt */ + return Attempt::error(Failure::unexpectedFrame()); } if ($received->oneOf(...$names)) { - /** @var Either */ - return Either::right($received); + return Attempt::result($received); } if ($received->is(Method::connectionClose)) { - /** @var Either */ + /** @var Attempt */ return $this ->send(static fn($protocol) => $protocol->connection()->closeOk()) - ->leftMap(static fn() => Failure::toCloseConnection()) + ->recover(static fn() => Attempt::error(Failure::toCloseConnection())) ->flatMap(static function() use ($received) { $message = $received ->frame() @@ -355,11 +348,11 @@ private function ensureValidFrame( static fn(int $class, int $method) => Method::of($class, $method), ); - return Either::left(Failure::closedByServer($message, $code, $method)); + return Attempt::error(Failure::closedByServer($message, $code, $method)); }); } - /** @var Either */ - return Either::left(Failure::unexpectedFrame()); + /** @var Attempt */ + return Attempt::error(Failure::unexpectedFrame()); } } diff --git a/src/Transport/Connection/Handshake.php b/src/Transport/Connection/Handshake.php index f2828e8..d2b1d99 100644 --- a/src/Transport/Connection/Handshake.php +++ b/src/Transport/Connection/Handshake.php @@ -18,7 +18,6 @@ use Innmind\Immutable\{ Attempt, Maybe, - Either, Predicate\Instance, }; @@ -44,14 +43,13 @@ public function __invoke(Connection $connection): Attempt ->flatMap(fn($received) => match ($received->is(Method::connectionSecure)) { true => $this->secure($connection), false => $this->maybeTune($connection, $received->frame()), - }) - ->attempt(static fn($failure) => $failure); + }); } /** - * @return Either + * @return Attempt */ - private function secure(Connection $connection): Either + private function secure(Connection $connection): Attempt { return $connection ->request( @@ -67,9 +65,9 @@ private function secure(Connection $connection): Either } /** - * @return Either + * @return Attempt */ - private function maybeTune(Connection $connection, Frame $frame): Either + private function maybeTune(Connection $connection, Frame $frame): Attempt { $maxChannels = $frame ->values() @@ -91,8 +89,15 @@ private function maybeTune(Connection $connection, Frame $frame): Either ->map(Period::millisecond(...)); return Maybe::all($maxChannels, $maxFrameSize, $heartbeat) - ->flatMap($connection->tune(...)) - ->either() - ->leftMap(static fn() => Failure::toOpenConnection()); + ->flatMap( + static fn( + MaxChannels $maxChannels, + MaxFrameSize $maxFrameSize, + Period $heartbeat, + ) => $connection + ->tune($maxChannels, $maxFrameSize, $heartbeat) + ->maybe(), + ) + ->attempt(static fn() => Failure::toOpenConnection()); } } diff --git a/src/Transport/Connection/MessageReader.php b/src/Transport/Connection/MessageReader.php index f9eb361..d4f929d 100644 --- a/src/Transport/Connection/MessageReader.php +++ b/src/Transport/Connection/MessageReader.php @@ -52,6 +52,8 @@ public function __invoke(Connection $connection): Either { return $connection ->wait() + ->either() + ->leftMap(static fn() => Failure::unexpectedFrame()) // todo delete ->flatMap(fn($received) => $this->decode( $connection, $received->frame(), diff --git a/src/Transport/Connection/OpenVHost.php b/src/Transport/Connection/OpenVHost.php index a550da6..b6285fa 100644 --- a/src/Transport/Connection/OpenVHost.php +++ b/src/Transport/Connection/OpenVHost.php @@ -35,7 +35,6 @@ public function __invoke(Connection $connection): Attempt ), Method::connectionOpenOk, ) - ->attempt(static fn($failure) => $failure) ->map(static fn() => $connection); } } diff --git a/src/Transport/Connection/SignalListener.php b/src/Transport/Connection/SignalListener.php index 861bef4..73fe4a5 100644 --- a/src/Transport/Connection/SignalListener.php +++ b/src/Transport/Connection/SignalListener.php @@ -13,7 +13,7 @@ use Innmind\OperatingSystem\CurrentProcess\Signals; use Innmind\Signals\Signal; use Innmind\Immutable\{ - Either, + Attempt, Maybe, }; @@ -100,21 +100,21 @@ public function notified(): bool /** * @template T * - * @param callable(): Either $continue + * @param callable(): Attempt $continue * - * @return Either + * @return Attempt */ - public function close(Connection $connection, callable $continue): Either + public function close(Connection $connection, callable $continue): Attempt { return Maybe::all($this->received, $this->channel) ->map(static fn(Signal $signal, Channel $channel) => [$signal, $channel]) ->filter(fn() => !$this->closing) - ->either() ->match( function($in) use ($connection) { $this->closing = true; [$signal, $channel] = $in; + /** @var Attempt Todo fix */ return $connection ->request( static fn($protocol) => $protocol->channel()->close( @@ -123,14 +123,13 @@ function($in) use ($connection) { ), Method::channelCloseOk, ) - ->leftMap(static fn() => Failure::toCloseChannel()) + ->recover(static fn() => Attempt::error(Failure::toCloseChannel())) ->flatMap( static fn() => $connection ->close() - ->either() - ->leftMap(static fn() => Failure::toCloseConnection()), + ->recover(static fn() => Attempt::error(Failure::toCloseConnection())), ) - ->flatMap(static fn() => Either::left(Failure::closedBySignal($signal))); + ->recover(static fn() => Attempt::error(Failure::closedBySignal($signal))); }, static fn() => $continue(), ); diff --git a/src/Transport/Connection/Start.php b/src/Transport/Connection/Start.php index 289310a..7625c4e 100644 --- a/src/Transport/Connection/Start.php +++ b/src/Transport/Connection/Start.php @@ -43,7 +43,6 @@ public function __invoke(Connection $connection): Attempt ), ), ) - ->attempt(static fn($failure) => $failure) ->map(static fn() => $connection); } } From 279ca53ef44572fa989626d1d7f4208ed7b55201 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 19:06:54 +0200 Subject: [PATCH 06/17] keep track of previous errors --- src/Client.php | 8 ++++---- src/Command/Bind.php | 2 +- src/Command/Consume.php | 6 +++--- src/Command/DeclareExchange.php | 2 +- src/Command/DeclareQueue.php | 2 +- src/Command/DeleteExchange.php | 2 +- src/Command/DeleteQueue.php | 2 +- src/Command/Get.php | 2 +- src/Command/Publish.php | 2 +- src/Command/Purge.php | 2 +- src/Command/Qos.php | 2 +- src/Command/Transaction.php | 6 +++--- src/Command/Unbind.php | 2 +- src/Failure.php | 14 ++++++++++++++ src/Transport/Connection.php | 2 +- src/Transport/Connection/SignalListener.php | 6 +++--- 16 files changed, 38 insertions(+), 24 deletions(-) diff --git a/src/Client.php b/src/Client.php index 4ee00ff..095aff0 100644 --- a/src/Client.php +++ b/src/Client.php @@ -129,7 +129,7 @@ private function openChannel(): Attempt $channel = new Channel(1); return ($this->load)() - ->recover(static fn() => Attempt::error(Failure::toOpenConnection())) + ->mapError(Failure::as(Failure::toOpenConnection())) ->flatMap( fn($connection) => $connection ->request( @@ -144,7 +144,7 @@ private function openChannel(): Attempt static fn() => null, )) ->map(static fn() => [$connection, $channel]) - ->recover(static fn() => Attempt::error(Failure::toOpenChannel())), + ->mapError(Failure::as(Failure::toOpenChannel())), ); } @@ -161,11 +161,11 @@ private function close(Connection $connection, Channel $channel): Attempt ), Method::channelCloseOk, ) - ->recover(static fn() => Attempt::error(Failure::toCloseChannel())) + ->mapError(Failure::as(Failure::toCloseChannel())) ->flatMap( static fn() => $connection ->close() - ->recover(static fn() => Attempt::error(Failure::toCloseConnection())), + ->mapError(Failure::as(Failure::toCloseConnection())), ); } } diff --git a/src/Command/Bind.php b/src/Command/Bind.php index 0ce074e..8a94029 100644 --- a/src/Command/Bind.php +++ b/src/Command/Bind.php @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->recover(fn() => Attempt::error(Failure::toBind($this->command))); + ->mapError(Failure::as(Failure::toBind($this->command))); } #[\NoDiscard] diff --git a/src/Command/Consume.php b/src/Command/Consume.php index eba7a6b..6cf0a69 100644 --- a/src/Command/Consume.php +++ b/src/Command/Consume.php @@ -69,8 +69,8 @@ public function __invoke( ->map(static fn() => $state), }; - return $sideEffect->recover( - fn() => Attempt::error(Failure::toConsume($this->command)), + return $sideEffect->mapError( + Failure::as(Failure::toConsume($this->command)), ); } @@ -186,7 +186,7 @@ private function waitDeliver( ), ), ) - ->recover(fn() => Attempt::error(Failure::toConsume($this->command))); + ->mapError(Failure::as(Failure::toConsume($this->command))); } /** diff --git a/src/Command/DeclareExchange.php b/src/Command/DeclareExchange.php index 2dd2089..0606be9 100644 --- a/src/Command/DeclareExchange.php +++ b/src/Command/DeclareExchange.php @@ -48,7 +48,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->recover(fn() => Attempt::error(Failure::toDeclareExchange($this->command))); + ->mapError(Failure::as(Failure::toDeclareExchange($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeclareQueue.php b/src/Command/DeclareQueue.php index bd3122a..4f922f6 100644 --- a/src/Command/DeclareQueue.php +++ b/src/Command/DeclareQueue.php @@ -78,7 +78,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->recover(fn() => Attempt::error(Failure::toDeclareQueue($this->command))); + ->mapError(Failure::as(Failure::toDeclareQueue($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeleteExchange.php b/src/Command/DeleteExchange.php index ab8cd95..41d0559 100644 --- a/src/Command/DeleteExchange.php +++ b/src/Command/DeleteExchange.php @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->recover(fn() => Attempt::error(Failure::toDeleteExchange($this->command))); + ->mapError(Failure::as(Failure::toDeleteExchange($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeleteQueue.php b/src/Command/DeleteQueue.php index 92cccae..12c989f 100644 --- a/src/Command/DeleteQueue.php +++ b/src/Command/DeleteQueue.php @@ -64,7 +64,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->recover(fn() => Attempt::error(Failure::toDeleteQueue($this->command))); + ->mapError(Failure::as(Failure::toDeleteQueue($this->command))); } #[\NoDiscard] diff --git a/src/Command/Get.php b/src/Command/Get.php index ff5a476..75c4670 100644 --- a/src/Command/Get.php +++ b/src/Command/Get.php @@ -118,7 +118,7 @@ public function doGet( $state, ), ) - ->recover(fn() => Attempt::error(Failure::toGet($this->command))); + ->mapError(Failure::as(Failure::toGet($this->command))); } /** diff --git a/src/Command/Publish.php b/src/Command/Publish.php index 0bca3a7..83e330b 100644 --- a/src/Command/Publish.php +++ b/src/Command/Publish.php @@ -95,6 +95,6 @@ private function publish( $command, $maxFrameSize, )) - ->recover(static fn() => Attempt::error(Failure::toPublish($command))); + ->mapError(Failure::as(Failure::toPublish($command))); } } diff --git a/src/Command/Purge.php b/src/Command/Purge.php index 0cf5f41..7694181 100644 --- a/src/Command/Purge.php +++ b/src/Command/Purge.php @@ -64,7 +64,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->recover(fn() => Attempt::error(Failure::toPurge($this->command))); + ->mapError(Failure::as(Failure::toPurge($this->command))); } #[\NoDiscard] diff --git a/src/Command/Qos.php b/src/Command/Qos.php index f661900..1dbc95a 100644 --- a/src/Command/Qos.php +++ b/src/Command/Qos.php @@ -40,7 +40,7 @@ public function __invoke( Method::basicQosOk, ) ->map(static fn() => $state) - ->recover(static fn() => Attempt::error(Failure::toAdjustQos())); + ->mapError(Failure::as(Failure::toAdjustQos())); } /** diff --git a/src/Command/Transaction.php b/src/Command/Transaction.php index 391398c..b5c78f4 100644 --- a/src/Command/Transaction.php +++ b/src/Command/Transaction.php @@ -84,7 +84,7 @@ private function select( Method::transactionSelectOk, ) ->map(static fn() => $connection) - ->recover(static fn() => Attempt::error(Failure::toSelect())); + ->mapError(Failure::as(Failure::toSelect())); } /** @@ -115,7 +115,7 @@ private function commit( Method::transactionCommitOk, ) ->map(static fn() => $state) - ->recover(static fn() => Attempt::error(Failure::toCommit())); + ->mapError(Failure::as(Failure::toCommit())); } /** @@ -132,6 +132,6 @@ private function rollback( Method::transactionRollbackOk, ) ->map(static fn() => $state) - ->recover(static fn() => Attempt::error(Failure::toRollback())); + ->mapError(Failure::as(Failure::toRollback())); } } diff --git a/src/Command/Unbind.php b/src/Command/Unbind.php index 2d48f6c..64ec6b6 100644 --- a/src/Command/Unbind.php +++ b/src/Command/Unbind.php @@ -40,7 +40,7 @@ public function __invoke( Method::queueUnbindOk, ) ->map(static fn() => $state) - ->recover(fn() => Attempt::error(Failure::toUnbind($this->command))); + ->mapError(Failure::as(Failure::toUnbind($this->command))); } #[\NoDiscard] diff --git a/src/Failure.php b/src/Failure.php index eff8923..bf3a3c8 100644 --- a/src/Failure.php +++ b/src/Failure.php @@ -12,7 +12,21 @@ final class Failure extends Exception\RuntimeException private function __construct( private object $failure, private Failure\Kind $kind, + ?\Throwable $previous = null, ) { + parent::__construct('', 0, $previous); + } + + /** + * @return callable(\Throwable): \Throwable + */ + public static function as(self $failure): callable + { + return static fn(\Throwable $e) => new self( + $failure->failure, + $failure->kind, + $e, + ); } #[\NoDiscard] diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php index cc1fbaa..8186fbb 100644 --- a/src/Transport/Connection.php +++ b/src/Transport/Connection.php @@ -308,7 +308,7 @@ private function ensureValidFrame( /** @var Attempt */ return $this ->send(static fn($protocol) => $protocol->connection()->closeOk()) - ->recover(static fn() => Attempt::error(Failure::toCloseConnection())) + ->mapError(Failure::as(Failure::toCloseConnection())) ->flatMap(static function() use ($received) { $message = $received ->frame() diff --git a/src/Transport/Connection/SignalListener.php b/src/Transport/Connection/SignalListener.php index 73fe4a5..7b93a1a 100644 --- a/src/Transport/Connection/SignalListener.php +++ b/src/Transport/Connection/SignalListener.php @@ -123,13 +123,13 @@ function($in) use ($connection) { ), Method::channelCloseOk, ) - ->recover(static fn() => Attempt::error(Failure::toCloseChannel())) + ->mapError(Failure::as(Failure::toCloseChannel())) ->flatMap( static fn() => $connection ->close() - ->recover(static fn() => Attempt::error(Failure::toCloseConnection())), + ->mapError(Failure::as(Failure::toCloseConnection())), ) - ->recover(static fn() => Attempt::error(Failure::closedBySignal($signal))); + ->mapError(Failure::as(Failure::closedBySignal($signal))); }, static fn() => $continue(), ); From 938645fd1868b7c63064244b81816d630ddd80a2 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 19:12:18 +0200 Subject: [PATCH 07/17] make MessageReader return an Attempt --- src/Command/Consume.php | 22 +++++++-------- src/Command/Get.php | 20 ++++++------- src/Consumer/Continuation.php | 8 ++---- src/Transport/Connection/MessageReader.php | 33 +++++++++------------- 4 files changed, 35 insertions(+), 48 deletions(-) diff --git a/src/Command/Consume.php b/src/Command/Consume.php index 6cf0a69..a9bae13 100644 --- a/src/Command/Consume.php +++ b/src/Command/Consume.php @@ -172,19 +172,17 @@ private function waitDeliver( return $connection ->wait(Method::basicDeliver) ->flatMap( - fn($received) => $read($connection) - ->attempt(static fn($failure) => $failure) - ->flatMap( - fn($message) => $this->maybeConsume( - $connection, - $channel, - $read, - $state, - $consumerTag, - $received->frame(), - $message, - ), + fn($received) => $read($connection)->flatMap( + fn($message) => $this->maybeConsume( + $connection, + $channel, + $read, + $state, + $consumerTag, + $received->frame(), + $message, ), + ), ) ->mapError(Failure::as(Failure::toConsume($this->command))); } diff --git a/src/Command/Get.php b/src/Command/Get.php index 75c4670..d92de37 100644 --- a/src/Command/Get.php +++ b/src/Command/Get.php @@ -166,18 +166,16 @@ private function maybeConsume( ->map(Details::ofGet(...)) ->attempt(fn() => Failure::toGet($this->command)) ->flatMap( - fn($details) => $read($connection) - ->attempt(static fn($failure) => $failure) - ->flatMap( - fn($message) => $this->consume( - $connection, - $channel, - $read, - $state, - $message, - $details, - ), + fn($details) => $read($connection)->flatMap( + fn($message) => $this->consume( + $connection, + $channel, + $read, + $state, + $message, + $details, ), + ), ); } diff --git a/src/Consumer/Continuation.php b/src/Consumer/Continuation.php index 386ef67..4bf94d9 100644 --- a/src/Consumer/Continuation.php +++ b/src/Consumer/Continuation.php @@ -125,11 +125,9 @@ private function recover( // read all the frames for the prefetched message then wait for next // frame $received = $received->flatMap( - static fn() => $read($connection) - ->attempt(static fn($failure) => $failure) - ->flatMap( - static fn() => $connection->wait(), - ), + static fn() => $read($connection)->flatMap( + static fn() => $connection->wait(), + ), ); $walkOverPrefetchedMessages = $received->match( static fn($received) => $received->is(Method::basicDeliver), diff --git a/src/Transport/Connection/MessageReader.php b/src/Transport/Connection/MessageReader.php index d4f929d..4ca1b02 100644 --- a/src/Transport/Connection/MessageReader.php +++ b/src/Transport/Connection/MessageReader.php @@ -30,7 +30,7 @@ Predicate\Instance, Sequence, Maybe, - Either, + Attempt, }; /** @@ -46,14 +46,12 @@ private function __construct(Filesystem $filesystem) } /** - * @return Either + * @return Attempt */ - public function __invoke(Connection $connection): Either + public function __invoke(Connection $connection): Attempt { return $connection ->wait() - ->either() - ->leftMap(static fn() => Failure::unexpectedFrame()) // todo delete ->flatMap(fn($received) => $this->decode( $connection, $received->frame(), @@ -66,19 +64,18 @@ public static function of(Filesystem $filesystem): self } /** - * @return Either + * @return Attempt */ private function decode( Connection $connection, Frame $header, - ): Either { + ): Attempt { return $header ->values() ->first() ->keep(Instance::of(Value\UnsignedLongLongInteger::class)) ->map(static fn($value) => $value->original()) - ->either() - ->leftMap(static fn() => Failure::toReadMessage()) + ->attempt(static fn() => Failure::toReadMessage()) ->flatMap(fn($bodySize) => $this->readMessage($connection, $bodySize)) ->flatMap( fn($message) => $header @@ -86,8 +83,7 @@ private function decode( ->get(1) ->keep(Instance::of(Value\UnsignedShortInteger::class)) ->map(static fn($value) => $value->original()) - ->either() - ->leftMap(static fn() => Failure::toReadMessage()) + ->attempt(static fn() => Failure::toReadMessage()) ->flatMap(fn($flagBits) => $this->addProperties( $message, $flagBits, @@ -101,13 +97,13 @@ private function decode( /** * @param Sequence $properties * - * @return Either + * @return Attempt */ private function addProperties( Message $message, int $flagBits, Sequence $properties, - ): Either { + ): Attempt { /** @var Sequence, Message): Maybe}> */ $toParse = Sequence::of( [ @@ -230,7 +226,6 @@ private function addProperties( * @psalm-suppress MixedArrayAccess * @psalm-suppress MixedArgument * @psalm-suppress MixedMethodCall - * @var Either */ return $toParse ->filter(static fn($pair) => (bool) ($flagBits & (1 << $pair[0]))) @@ -243,17 +238,16 @@ private function addProperties( ), ) ->map(static fn($state) => $state[1]) - ->either() - ->leftMap(static fn() => Failure::toReadMessage()); + ->attempt(static fn() => Failure::toReadMessage()); } /** - * @return Either + * @return Attempt */ private function readMessage( Connection $connection, int $bodySize, - ): Either { + ): Attempt { $chunks = Sequence::lazy(static function() use ($connection, $bodySize) { $continue = $bodySize !== 0; $read = 0; @@ -292,8 +286,7 @@ private function readMessage( }; return $content - ->either() ->map(Message::file(...)) - ->leftMap(static fn() => Failure::toReadMessage()); + ->mapError(Failure::as(Failure::toReadMessage())); } } From fd57ee5e6a76343ae96315af4bfc547813368b35 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 19:13:20 +0200 Subject: [PATCH 08/17] CS --- src/Transport/Connection/MessageReader.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Transport/Connection/MessageReader.php b/src/Transport/Connection/MessageReader.php index 4ca1b02..a18d3c5 100644 --- a/src/Transport/Connection/MessageReader.php +++ b/src/Transport/Connection/MessageReader.php @@ -230,12 +230,10 @@ private function addProperties( return $toParse ->filter(static fn($pair) => (bool) ($flagBits & (1 << $pair[0]))) ->map(static fn($pair) => $pair[1]) - ->reduce( - Maybe::just([$properties, $message]), - static fn(Maybe $state, $parse): Maybe => $state->flatMap( - static fn($state) => $parse($state[0]->first(), $state[1]) - ->map(static fn($message) => [$state[0]->drop(1), $message]), - ), + ->sink([$properties, $message]) + ->maybe( + static fn($state, $parse) => $parse($state[0]->first(), $state[1]) + ->map(static fn($message) => [$state[0]->drop(1), $message]), ) ->map(static fn($state) => $state[1]) ->attempt(static fn() => Failure::toReadMessage()); From 98c319860fc53b7c69997f822d0599d22ed93720 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 19:17:01 +0200 Subject: [PATCH 09/17] make Continuation return an Attempt --- src/Command/Consume.php | 3 +-- src/Command/Get.php | 3 +-- src/Consumer/Continuation.php | 44 +++++++++++++++-------------------- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/src/Command/Consume.php b/src/Command/Consume.php index a9bae13..66b4e5d 100644 --- a/src/Command/Consume.php +++ b/src/Command/Consume.php @@ -268,7 +268,6 @@ private function consume( $read, $details->deliveryTag(), $consumerTag, - ) - ->attempt(static fn($failure) => $failure); + ); } } diff --git a/src/Command/Get.php b/src/Command/Get.php index d92de37..810da1b 100644 --- a/src/Command/Get.php +++ b/src/Command/Get.php @@ -206,7 +206,6 @@ private function consume( ->map(static fn($state) => match (true) { $state instanceof Canceled => $state->state(), default => $state, - }) - ->attempt(static fn($failure) => $failure); + }); } } diff --git a/src/Consumer/Continuation.php b/src/Consumer/Continuation.php index 4bf94d9..ed7ddee 100644 --- a/src/Consumer/Continuation.php +++ b/src/Consumer/Continuation.php @@ -17,7 +17,7 @@ Exception\BasicGetNotCancellable, }; use Innmind\Immutable\{ - Either, + Attempt, SideEffect, }; @@ -73,7 +73,7 @@ public function cancel(mixed $state): self * * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ public function respond( string $queue, @@ -82,8 +82,8 @@ public function respond( MessageReader $read, int $deliveryTag, ?string $consumerTag = null, - ): Either { - /** @var Either */ + ): Attempt { + /** @var Attempt */ return match ($this->response) { State::cancel => $this ->doAck($queue, $connection, $channel, $deliveryTag) @@ -107,14 +107,14 @@ public function respond( } /** - * @return Either + * @return Attempt */ private function recover( string $queue, Connection $connection, Channel $channel, MessageReader $read, - ): Either { + ): Attempt { $received = $connection->wait(); $walkOverPrefetchedMessages = $received->match( static fn($received) => $received->is(Method::basicDeliver), @@ -142,7 +142,6 @@ private function recover( // to "consume" within the same channel will receive new messages but // won't receive previously prefetched messages leading to out of order // messages handling - /** @var Either */ return $received ->flatMap(static fn() => $connection->request( static fn($protocol) => $protocol->basic()->recover( @@ -151,80 +150,76 @@ private function recover( ), Method::basicRecoverOk, )) - ->either() ->map(fn() => Canceled::of($this->state)) - ->leftMap(static fn() => Failure::toRecover($queue)); + ->mapError(Failure::as(Failure::toRecover($queue))); } /** * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ private function doAck( string $queue, Connection $connection, Channel $channel, int $deliveryTag, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol) => $protocol->basic()->ack( $channel, Ack::of($deliveryTag), )) - ->either() - ->leftMap(static fn() => Failure::toAck($queue)); + ->mapError(Failure::as(Failure::toAck($queue))); } /** * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ private function doReject( string $queue, Connection $connection, Channel $channel, int $deliveryTag, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol) => $protocol->basic()->reject( $channel, Reject::of($deliveryTag), )) - ->either() - ->leftMap(static fn() => Failure::toReject($queue)); + ->mapError(Failure::as(Failure::toReject($queue))); } /** * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ private function doRequeue( string $queue, Connection $connection, Channel $channel, int $deliveryTag, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol) => $protocol->basic()->reject( $channel, Reject::requeue($deliveryTag), )) - ->either() - ->leftMap(static fn() => Failure::toReject($queue)); + ->mapError(Failure::as(Failure::toReject($queue))); } /** - * @return Either + * @return Attempt */ private function doCancel( string $queue, Connection $connection, Channel $channel, ?string $consumerTag, - ): Either { + ): Attempt { if (\is_null($consumerTag)) { // this means the user called self::cancel when inside a Get throw new BasicGetNotCancellable; @@ -235,7 +230,6 @@ private function doCancel( $channel, Cancel::of($consumerTag), )) - ->either() - ->leftMap(static fn() => Failure::toCancel($queue)); + ->mapError(Failure::as(Failure::toCancel($queue))); } } From 310040e794ca238c6370554238a4436393cd16dd Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 19:21:46 +0200 Subject: [PATCH 10/17] bump foundation --- CHANGELOG.md | 2 +- composer.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f29137..3801184 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changed -- Requires `innmind/foundation:~1.5` +- Requires `innmind/foundation:~1.6` - `Innmind\AMQP\Factory::make()` timeout argument is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Failure` is now an exception that wraps each possible failure object diff --git a/composer.json b/composer.json index bf0177b..7b233ef 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ }, "require": { "php": "~8.2", - "innmind/foundation": "~1.5", + "innmind/foundation": "~1.6", "ramsey/uuid": "~4.0" }, "autoload": { From 102e3b74a54574d1f46c23859997447a796ff91e Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 19:52:30 +0200 Subject: [PATCH 11/17] return an error when the channel or frame size is incorrect instead of throwing --- CHANGELOG.md | 2 +- composer.json | 2 +- src/Model/Connection/MaxChannels.php | 12 +++++++--- src/Model/Connection/MaxFrameSize.php | 10 +++++--- src/Transport/Connection.php | 33 +++++++++++++-------------- 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3801184..d8a6a93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changed -- Requires `innmind/foundation:~1.6` +- Requires `innmind/foundation:~1.7` - `Innmind\AMQP\Factory::make()` timeout argument is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Failure` is now an exception that wraps each possible failure object diff --git a/composer.json b/composer.json index 7b233ef..4d23a77 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ }, "require": { "php": "~8.2", - "innmind/foundation": "~1.6", + "innmind/foundation": "~1.7", "ramsey/uuid": "~4.0" }, "autoload": { diff --git a/src/Model/Connection/MaxChannels.php b/src/Model/Connection/MaxChannels.php index 86264b0..0d41b4f 100644 --- a/src/Model/Connection/MaxChannels.php +++ b/src/Model/Connection/MaxChannels.php @@ -4,6 +4,10 @@ namespace Innmind\AMQP\Model\Connection; use Innmind\AMQP\Exception\FrameChannelExceedAllowedChannelNumber; +use Innmind\Immutable\{ + Attempt, + SideEffect, +}; /** * @psalm-immutable @@ -52,14 +56,16 @@ public function allows(int $channel): bool } /** - * @throws FrameChannelExceedAllowedChannelNumber + * @return Attempt */ #[\NoDiscard] - public function verify(int $channel): void + public function verify(int $channel): Attempt { if (!$this->allows($channel)) { - throw new FrameChannelExceedAllowedChannelNumber($channel, $this); + return Attempt::error(new FrameChannelExceedAllowedChannelNumber($channel, $this)); } + + return Attempt::result(SideEffect::identity()); } /** diff --git a/src/Model/Connection/MaxFrameSize.php b/src/Model/Connection/MaxFrameSize.php index e7958f7..970f293 100644 --- a/src/Model/Connection/MaxFrameSize.php +++ b/src/Model/Connection/MaxFrameSize.php @@ -10,6 +10,8 @@ use Innmind\Immutable\{ Sequence, Str, + Attempt, + SideEffect, }; /** @@ -69,14 +71,16 @@ public function allows(int $size): bool } /** - * @throws FrameExceedAllowedSize + * @return Attempt */ #[\NoDiscard] - public function verify(int $size): void + public function verify(int $size): Attempt { if (!$this->allows($size)) { - throw new FrameExceedAllowedSize($size, $this); + return Attempt::error(new FrameExceedAllowedSize($size, $this)); } + + return Attempt::result(SideEffect::identity()); } /** diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php index 8186fbb..178f382 100644 --- a/src/Transport/Connection.php +++ b/src/Transport/Connection.php @@ -19,8 +19,6 @@ Model\Connection\MaxFrameSize, Model\Connection\TuneOk, Failure, - Exception\FrameChannelExceedAllowedChannelNumber, - Exception\FrameExceedAllowedSize, }; use Innmind\OperatingSystem\CurrentProcess\Signals; use Innmind\IO\{ @@ -250,30 +248,31 @@ private function flagActive(ReceivedFrame $received): ReceivedFrame * * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @throws FrameChannelExceedAllowedChannelNumber - * @throws FrameExceedAllowedSize - * * @return Attempt */ private function sendFrames(callable $frames): Attempt { $data = $frames($this->protocol, $this->maxFrameSize) - ->map(function($frame) { - $this->maxChannels->verify($frame->channel()->toInt()); - - return $frame; - }) - ->map(static fn($frame) => $frame->pack()) - ->map(function($frame) { - $this->maxFrameSize->verify($frame->length()); - - return $frame; - }); + ->map( + fn($frame) => $this + ->maxChannels + ->verify($frame->channel()->toInt()) + ->map(static fn() => $frame), + ) + ->map(static fn($frame) => $frame->map( + static fn($frame) => $frame->pack(), + )) + ->map(fn($frame) => $frame->flatMap( + fn($frame) => $this + ->maxFrameSize + ->verify($frame->length()) + ->map(static fn() => $frame), + )); return $this ->socket ->abortWhen($this->signals->notified(...)) - ->sink($data) + ->sinkAttempts($data) ->eitherWay( static fn() => Attempt::result(new SideEffect), fn() => $this->signals->close( From 7aadca75df75b7ce60348410a6b9ae1adc2a6432 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 20:16:20 +0200 Subject: [PATCH 12/17] fix tests --- composer.json | 2 +- tests/Model/Connection/MaxChannelsTest.php | 13 ++++++++++--- tests/Model/Connection/MaxFrameSizeTest.php | 13 ++++++++++--- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/composer.json b/composer.json index 4d23a77..50de1e0 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ }, "require": { "php": "~8.2", - "innmind/foundation": "~1.7", + "innmind/foundation": "^1.7.1", "ramsey/uuid": "~4.0" }, "autoload": { diff --git a/tests/Model/Connection/MaxChannelsTest.php b/tests/Model/Connection/MaxChannelsTest.php index 45c791c..a91f29a 100644 --- a/tests/Model/Connection/MaxChannelsTest.php +++ b/tests/Model/Connection/MaxChannelsTest.php @@ -7,6 +7,7 @@ Model\Connection\MaxChannels, Exception\FrameChannelExceedAllowedChannelNumber, }; +use Innmind\Immutable\SideEffect; use Innmind\BlackBox\{ PHPUnit\BlackBox, PHPUnit\Framework\TestCase, @@ -86,7 +87,10 @@ public function testVerifyAllowedNumbers() ->then(function($size) { $max = MaxChannels::of(0); - $this->assertNull($max->verify($size)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($size)->unwrap(), + ); }); $this ->forAll( @@ -96,7 +100,10 @@ public function testVerifyAllowedNumbers() ->then(function($allowed, $numberBelow) { $max = MaxChannels::of($allowed); - $this->assertNull($max->verify($allowed - $numberBelow)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($allowed - $numberBelow)->unwrap(), + ); }); } @@ -115,7 +122,7 @@ public function testThrowWhenVerifyingNumberAboveMaxAllowed(): BlackBox\Proof $above = $allowed + $extraNumber; try { - $max->verify($above); + $max->verify($above)->unwrap(); $this->fail('it should throw'); } catch (FrameChannelExceedAllowedChannelNumber $e) { $this->assertSame( diff --git a/tests/Model/Connection/MaxFrameSizeTest.php b/tests/Model/Connection/MaxFrameSizeTest.php index 85351d3..fe82319 100644 --- a/tests/Model/Connection/MaxFrameSizeTest.php +++ b/tests/Model/Connection/MaxFrameSizeTest.php @@ -7,6 +7,7 @@ Model\Connection\MaxFrameSize, Exception\FrameExceedAllowedSize, }; +use Innmind\Immutable\SideEffect; use Innmind\BlackBox\{ PHPUnit\BlackBox, PHPUnit\Framework\TestCase, @@ -86,7 +87,10 @@ public function testVerifyAllowedSizes() ->then(function($size) { $max = MaxFrameSize::of(0); - $this->assertNull($max->verify($size)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($size)->unwrap(), + ); }); $this ->forAll( @@ -96,7 +100,10 @@ public function testVerifyAllowedSizes() ->then(function($allowed, $sizeBelow) { $max = MaxFrameSize::of($allowed); - $this->assertNull($max->verify($allowed - $sizeBelow)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($allowed - $sizeBelow)->unwrap(), + ); }); } @@ -117,7 +124,7 @@ public function testThrowWhenVerifyingSizeAboveMaxAllowed(): BlackBox\Proof $this->expectException(FrameExceedAllowedSize::class); $this->expectExceptionMessage("Max frame size can be $allowed but got $above"); - $max->verify($above); + $max->verify($above)->unwrap(); }); } From b3624d33bc1fe719ae60b1fb2fe0452a672d2a42 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 20:20:40 +0200 Subject: [PATCH 13/17] remove useless code --- src/Failure/ToConsume.php | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/Failure/ToConsume.php b/src/Failure/ToConsume.php index 92905f9..a4a5a3e 100644 --- a/src/Failure/ToConsume.php +++ b/src/Failure/ToConsume.php @@ -3,9 +3,7 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Model\Basic\Consume as Command, -}; +use Innmind\AMQP\Model\Basic\Consume as Command; /** * @psalm-immutable @@ -27,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\NoDiscard] - public function kind(): Kind - { - return Kind::toConsume; - } } From 0c877df806ca59820dfb0b42b70a969f346b78f7 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 20:35:20 +0200 Subject: [PATCH 14/17] CS --- src/Transport/Connection.php | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php index 178f382..20031a5 100644 --- a/src/Transport/Connection.php +++ b/src/Transport/Connection.php @@ -252,22 +252,18 @@ private function flagActive(ReceivedFrame $received): ReceivedFrame */ private function sendFrames(callable $frames): Attempt { - $data = $frames($this->protocol, $this->maxFrameSize) - ->map( - fn($frame) => $this - ->maxChannels - ->verify($frame->channel()->toInt()) - ->map(static fn() => $frame), - ) - ->map(static fn($frame) => $frame->map( - static fn($frame) => $frame->pack(), - )) - ->map(fn($frame) => $frame->flatMap( - fn($frame) => $this - ->maxFrameSize - ->verify($frame->length()) - ->map(static fn() => $frame), - )); + $data = $frames($this->protocol, $this->maxFrameSize)->map( + fn($frame) => $this + ->maxChannels + ->verify($frame->channel()->toInt()) + ->map(static fn() => $frame->pack()) + ->flatMap( + fn($frame) => $this + ->maxFrameSize + ->verify($frame->length()) + ->map(static fn() => $frame), + ) + ); return $this ->socket From dbc2b09e011e6d20e3f8e975a37fb3dcaed4fdb2 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 20:38:01 +0200 Subject: [PATCH 15/17] flag Failure::as() as internal --- src/Failure.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Failure.php b/src/Failure.php index bf3a3c8..64f4946 100644 --- a/src/Failure.php +++ b/src/Failure.php @@ -18,6 +18,8 @@ private function __construct( } /** + * @internal + * * @return callable(\Throwable): \Throwable */ public static function as(self $failure): callable From 430c645459002912379fa46c293345721b037f32 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 20:44:25 +0200 Subject: [PATCH 16/17] CS --- src/Transport/Connection.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php index 20031a5..8d868f4 100644 --- a/src/Transport/Connection.php +++ b/src/Transport/Connection.php @@ -262,7 +262,7 @@ private function sendFrames(callable $frames): Attempt ->maxFrameSize ->verify($frame->length()) ->map(static fn() => $frame), - ) + ), ); return $this From 2e20f6d95b12565c365e4968dc0f38906a307d66 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Fri, 8 Aug 2025 20:58:02 +0200 Subject: [PATCH 17/17] always return an error when signalled --- src/Transport/Connection/SignalListener.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Transport/Connection/SignalListener.php b/src/Transport/Connection/SignalListener.php index 7b93a1a..598c34d 100644 --- a/src/Transport/Connection/SignalListener.php +++ b/src/Transport/Connection/SignalListener.php @@ -114,7 +114,6 @@ function($in) use ($connection) { $this->closing = true; [$signal, $channel] = $in; - /** @var Attempt Todo fix */ return $connection ->request( static fn($protocol) => $protocol->channel()->close( @@ -129,7 +128,7 @@ function($in) use ($connection) { ->close() ->mapError(Failure::as(Failure::toCloseConnection())), ) - ->mapError(Failure::as(Failure::closedBySignal($signal))); + ->flatMap(static fn() => Attempt::error(Failure::closedBySignal($signal))); }, static fn() => $continue(), );