diff --git a/CHANGELOG.md b/CHANGELOG.md index 45b21fd..d8a6a93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,12 @@ ### Changed -- Requires `innmind/foundation:~1.5` +- Requires `innmind/foundation:~1.7` - `Innmind\AMQP\Factory::make()` timeout argument is now expressed via `Innmind\TimeContinuum\Period` - `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period` +- `Innmind\AMQP\Failure` is now an exception that wraps each possible failure object +- `Innmind\AMQP\Client::run()` now returns an `Innmind\Immutable\Attempt` +- `Innmind\AMQP\Command::__invoke()` now must return an `Innmind\Immutable\Attempt` ### Fixed diff --git a/composer.json b/composer.json index bf0177b..50de1e0 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ }, "require": { "php": "~8.2", - "innmind/foundation": "~1.5", + "innmind/foundation": "^1.7.1", "ramsey/uuid": "~4.0" }, "autoload": { diff --git a/src/Client.php b/src/Client.php index 4c4b1b1..095aff0 100644 --- a/src/Client.php +++ b/src/Client.php @@ -15,7 +15,7 @@ Filesystem, }; use Innmind\Immutable\{ - Either, + Attempt, Maybe, SideEffect, }; @@ -24,7 +24,7 @@ final class Client { /** @var Maybe */ private Maybe $command; - /** @var callable(): Maybe */ + /** @var callable(): Attempt */ private $load; private Filesystem $filesystem; /** @var Maybe */ @@ -32,7 +32,7 @@ final class Client /** * @param Maybe $command - * @param callable(): Maybe $load + * @param callable(): Attempt $load * @param Maybe $signals */ private function __construct( @@ -48,7 +48,7 @@ private function __construct( } /** - * @param callable(): Maybe $load + * @param callable(): Attempt $load */ #[\NoDiscard] public static function of(callable $load, Filesystem $filesystem): self @@ -96,10 +96,10 @@ public function listenSignals(CurrentProcess $currentProcess): self * * @param T $state * - * @return Either + * @return Attempt */ #[\NoDiscard] - public function run(mixed $state): Either + public function run(mixed $state): Attempt { return $this->command->match( fn($command) => $this @@ -114,24 +114,22 @@ public function run(mixed $state): Either ->map(static fn(): mixed => $state->unwrap()), ); }), - static fn() => Either::right($state), + static fn() => Attempt::result($state), ); } /** - * @return Either + * @return Attempt */ - private function openChannel(): Either + private function openChannel(): Attempt { // Since the connection is never shared between objects then there is no // need to have a dynamic channel number as there will ALWAYS be one // channel per connection $channel = new Channel(1); - /** @var Either */ return ($this->load)() - ->either() - ->leftMap(static fn() => Failure::toOpenConnection()) + ->mapError(Failure::as(Failure::toOpenConnection())) ->flatMap( fn($connection) => $connection ->request( @@ -146,16 +144,15 @@ private function openChannel(): Either static fn() => null, )) ->map(static fn() => [$connection, $channel]) - ->leftMap(static fn() => Failure::toOpenChannel()), + ->mapError(Failure::as(Failure::toOpenChannel())), ); } /** - * @return Either + * @return Attempt */ - private function close(Connection $connection, Channel $channel): Either + private function close(Connection $connection, Channel $channel): Attempt { - /** @var Either */ return $connection ->request( static fn($protocol) => $protocol->channel()->close( @@ -164,12 +161,11 @@ private function close(Connection $connection, Channel $channel): Either ), Method::channelCloseOk, ) - ->leftMap(static fn() => Failure::toCloseChannel()) + ->mapError(Failure::as(Failure::toCloseChannel())) ->flatMap( static fn() => $connection ->close() - ->either() - ->leftMap(static fn() => Failure::toCloseConnection()), + ->mapError(Failure::as(Failure::toCloseConnection())), ); } } diff --git a/src/Command.php b/src/Command.php index 0367caf..456b2bd 100644 --- a/src/Command.php +++ b/src/Command.php @@ -8,12 +8,12 @@ Frame\Channel, Connection\MessageReader, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; interface Command { /** - * @return Either + * @return Attempt */ #[\NoDiscard] public function __invoke( @@ -21,5 +21,5 @@ public function __invoke( Channel $channel, MessageReader $read, Client\State $state, - ): Either; + ): Attempt; } diff --git a/src/Command/Bind.php b/src/Command/Bind.php index 8e9af28..8a94029 100644 --- a/src/Command/Bind.php +++ b/src/Command/Bind.php @@ -15,7 +15,7 @@ Model\Queue\Binding, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, }; @@ -34,7 +34,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->bind( $channel, $this->command, @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toBind($this->command)); + ->mapError(Failure::as(Failure::toBind($this->command))); } #[\NoDiscard] diff --git a/src/Command/Consume.php b/src/Command/Consume.php index 8230c95..66b4e5d 100644 --- a/src/Command/Consume.php +++ b/src/Command/Consume.php @@ -22,7 +22,7 @@ }; use Innmind\Immutable\{ Maybe, - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -48,7 +48,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->basic()->consume( $channel, $this->command, @@ -69,8 +69,8 @@ public function __invoke( ->map(static fn() => $state), }; - return $sideEffect->leftMap( - fn() => Failure::toConsume($this->command), + return $sideEffect->mapError( + Failure::as(Failure::toConsume($this->command)), ); } @@ -93,7 +93,7 @@ public function handle(callable $consume): self } /** - * @return Either + * @return Attempt */ private function maybeStart( Connection $connection, @@ -101,14 +101,14 @@ private function maybeStart( MessageReader $read, Frame $frame, State $state, - ): Either { + ): Attempt { return $frame ->values() ->first() ->keep(Instance::of(Value\ShortString::class)) ->map(static fn($value) => $value->original()->toString()) ->either() - ->leftMap(fn() => Failure::toConsume($this->command)) + ->attempt(fn() => Failure::toConsume($this->command)) ->flatMap(fn($consumerTag) => $this->start( $connection, $channel, @@ -119,7 +119,7 @@ private function maybeStart( } /** - * @return Either + * @return Attempt */ private function start( Connection $connection, @@ -127,9 +127,9 @@ private function start( MessageReader $read, State $state, string $consumerTag, - ): Either { - /** @var Either */ - $consumed = Either::right($state); + ): Attempt { + /** @var Attempt */ + $consumed = Attempt::result($state); // here the best approach would be to use recursion to avoid unwrapping // the monads but it would end up with a too deep call stack for inifite // consumers as each new message would mean a new function call in the @@ -160,7 +160,7 @@ private function start( } /** - * @return Either + * @return Attempt */ private function waitDeliver( Connection $connection, @@ -168,8 +168,7 @@ private function waitDeliver( State $state, string $consumerTag, MessageReader $read, - ): Either { - /** @var Either */ + ): Attempt { return $connection ->wait(Method::basicDeliver) ->flatMap( @@ -185,11 +184,11 @@ private function waitDeliver( ), ), ) - ->leftMap(fn() => Failure::toConsume($this->command)); + ->mapError(Failure::as(Failure::toConsume($this->command))); } /** - * @return Either + * @return Attempt */ private function maybeConsume( Connection $connection, @@ -199,7 +198,7 @@ private function maybeConsume( string $consumerTag, Frame $frame, Message $message, - ): Either { + ): Attempt { $destinationConsumerTag = $frame ->values() ->first() @@ -232,8 +231,7 @@ private function maybeConsume( ->flatMap(static fn($details) => $destinationConsumerTag->map( static fn() => $details, // this manipulation is to make sure the consumerTag is indeed for this consumer )) - ->either() - ->leftMap(fn() => Failure::toConsume($this->command)) + ->attempt(fn() => Failure::toConsume($this->command)) ->flatMap(fn($details) => $this->consume( $connection, $channel, @@ -246,7 +244,7 @@ private function maybeConsume( } /** - * @return Either + * @return Attempt */ private function consume( Connection $connection, @@ -256,7 +254,7 @@ private function consume( Details $details, Message $message, string $consumerTag, - ): Either { + ): Attempt { return ($this->consume)( $state->unwrap(), $message, diff --git a/src/Command/DeclareExchange.php b/src/Command/DeclareExchange.php index d750cf4..0606be9 100644 --- a/src/Command/DeclareExchange.php +++ b/src/Command/DeclareExchange.php @@ -16,7 +16,7 @@ Failure, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, }; @@ -35,7 +35,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->exchange()->declare( $channel, $this->command, @@ -48,7 +48,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeclareExchange($this->command)); + ->mapError(Failure::as(Failure::toDeclareExchange($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeclareQueue.php b/src/Command/DeclareQueue.php index 6da7e3a..4f922f6 100644 --- a/src/Command/DeclareQueue.php +++ b/src/Command/DeclareQueue.php @@ -19,7 +19,7 @@ }; use Innmind\Immutable\{ Maybe, - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -39,7 +39,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->declare( $channel, $this->command, @@ -71,14 +71,14 @@ public function __invoke( // maybe in the future we could expose this info to the user return Maybe::all($name, $message, $consumer) ->map(DeclareOk::of(...)) - ->either(); + ->attempt(static fn() => new \RuntimeException('Invalid declare.ok response')); }), false => $connection->send($frames), }; return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeclareQueue($this->command)); + ->mapError(Failure::as(Failure::toDeclareQueue($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeleteExchange.php b/src/Command/DeleteExchange.php index 57e14bb..41d0559 100644 --- a/src/Command/DeleteExchange.php +++ b/src/Command/DeleteExchange.php @@ -15,7 +15,7 @@ Failure, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, }; @@ -34,7 +34,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->exchange()->delete( $channel, $this->command, @@ -47,7 +47,7 @@ public function __invoke( return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeleteExchange($this->command)); + ->mapError(Failure::as(Failure::toDeleteExchange($this->command))); } #[\NoDiscard] diff --git a/src/Command/DeleteQueue.php b/src/Command/DeleteQueue.php index ef5e7b7..12c989f 100644 --- a/src/Command/DeleteQueue.php +++ b/src/Command/DeleteQueue.php @@ -18,7 +18,7 @@ Failure, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -38,7 +38,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->delete( $channel, $this->command, @@ -57,14 +57,14 @@ public function __invoke( ->map(static fn($value) => $value->original()) ->map(Count::of(...)) ->map(DeleteOk::of(...)) - ->either(), + ->attempt(static fn() => new \RuntimeException('Unable to find message count')), ), false => $connection->send($frames), }; return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toDeleteQueue($this->command)); + ->mapError(Failure::as(Failure::toDeleteQueue($this->command))); } #[\NoDiscard] diff --git a/src/Command/Get.php b/src/Command/Get.php index cd91205..810da1b 100644 --- a/src/Command/Get.php +++ b/src/Command/Get.php @@ -22,7 +22,7 @@ }; use Innmind\Immutable\{ Maybe, - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -52,22 +52,15 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { - /** - * @psalm-suppress MixedArgumentTypeCoercion - * @var Either - */ - return Sequence::of(...\array_fill(0, $this->take, null))->reduce( - Either::right($state), - fn(Either $state) => $state->flatMap( - fn(State $state) => $this->doGet( - $connection, - $channel, - $read, - $state, - ), - ), - ); + ): Attempt { + return Sequence::of(...\array_fill(0, $this->take, null)) + ->sink($state) + ->attempt(fn($state) => $this->doGet( + $connection, + $channel, + $read, + $state, + )); } #[\NoDiscard] @@ -99,15 +92,14 @@ public function take(int $take): self } /** - * @return Either + * @return Attempt */ public function doGet( Connection $connection, Channel $channel, MessageReader $read, State $state, - ): Either { - /** @var Either */ + ): Attempt { return $connection ->request( fn($protocol) => $protocol->basic()->get( @@ -126,11 +118,11 @@ public function doGet( $state, ), ) - ->leftMap(fn() => Failure::toGet($this->command)); + ->mapError(Failure::as(Failure::toGet($this->command))); } /** - * @return Either + * @return Attempt */ private function maybeConsume( Connection $connection, @@ -138,10 +130,9 @@ private function maybeConsume( MessageReader $read, Frame $frame, State $state, - ): Either { + ): Attempt { if ($frame->is(Method::basicGetEmpty)) { - /** @var Either */ - return Either::right($state); + return Attempt::result($state); } $deliveryTag = $frame @@ -171,11 +162,9 @@ private function maybeConsume( ->map(static fn($value) => $value->original()) ->map(Count::of(...)); - /** @var Either */ return Maybe::all($deliveryTag, $redelivered, $exchange, $routingKey, $messageCount) ->map(Details::ofGet(...)) - ->either() - ->leftMap(fn() => Failure::toGet($this->command)) + ->attempt(fn() => Failure::toGet($this->command)) ->flatMap( fn($details) => $read($connection)->flatMap( fn($message) => $this->consume( @@ -191,7 +180,7 @@ private function maybeConsume( } /** - * @return Either + * @return Attempt */ private function consume( Connection $connection, @@ -200,7 +189,7 @@ private function consume( State $state, Message $message, Details $details, - ): Either { + ): Attempt { return ($this->consume)( $state->unwrap(), $message, diff --git a/src/Command/Pipe.php b/src/Command/Pipe.php index d4360cc..a13b272 100644 --- a/src/Command/Pipe.php +++ b/src/Command/Pipe.php @@ -10,7 +10,7 @@ Transport\Frame\Channel, Client\State, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; /** * @internal @@ -32,7 +32,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return ($this->first)($connection, $channel, $read, $state)->flatMap( fn($state) => ($this->second)( $connection, diff --git a/src/Command/Publish.php b/src/Command/Publish.php index 39c128a..83e330b 100644 --- a/src/Command/Publish.php +++ b/src/Command/Publish.php @@ -14,7 +14,7 @@ Model\Basic\Message, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, SideEffect, }; @@ -38,16 +38,15 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { - /** @var Either */ + ): Attempt { return $this ->commands - ->reduce( - Either::right(new SideEffect), - fn(Either $state, $command) => $state->flatMap( - fn() => $this->publish($connection, $channel, $command), - ), - ) + ->sink(SideEffect::identity()) + ->attempt(fn($_, $command) => $this->publish( + $connection, + $channel, + $command, + )) ->map(static fn() => $state); } @@ -83,19 +82,19 @@ public function withRoutingKey(string $routingKey): self } /** - * @return Either + * @return Attempt */ private function publish( Connection $connection, Channel $channel, Model $command, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol, $maxFrameSize) => $protocol->basic()->publish( $channel, $command, $maxFrameSize, )) - ->leftMap(static fn() => Failure::toPublish($command)); + ->mapError(Failure::as(Failure::toPublish($command))); } } diff --git a/src/Command/Purge.php b/src/Command/Purge.php index 89ddad2..7694181 100644 --- a/src/Command/Purge.php +++ b/src/Command/Purge.php @@ -18,7 +18,7 @@ Model\Count, }; use Innmind\Immutable\{ - Either, + Attempt, Sequence, Predicate\Instance, }; @@ -38,7 +38,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { $frames = fn(Protocol $protocol): Sequence => $protocol->queue()->purge( $channel, $this->command, @@ -57,14 +57,14 @@ public function __invoke( ->map(static fn($value) => $value->original()) ->map(Count::of(...)) ->map(PurgeOk::of(...)) - ->either(), + ->attempt(static fn() => new \RuntimeException('Unable to find message count')), ), false => $connection->send($frames), }; return $sideEffect ->map(static fn() => $state) - ->leftMap(fn() => Failure::toPurge($this->command)); + ->mapError(Failure::as(Failure::toPurge($this->command))); } #[\NoDiscard] diff --git a/src/Command/Qos.php b/src/Command/Qos.php index 4959c99..1dbc95a 100644 --- a/src/Command/Qos.php +++ b/src/Command/Qos.php @@ -13,7 +13,7 @@ Transport\Frame\Method, Model\Basic\Qos as Model, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; final class Qos implements Command { @@ -30,7 +30,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return $connection ->request( fn($protocol) => $protocol->basic()->qos( @@ -40,7 +40,7 @@ public function __invoke( Method::basicQosOk, ) ->map(static fn() => $state) - ->leftMap(static fn() => Failure::toAdjustQos()); + ->mapError(Failure::as(Failure::toAdjustQos())); } /** diff --git a/src/Command/Transaction.php b/src/Command/Transaction.php index 1d0524d..b5c78f4 100644 --- a/src/Command/Transaction.php +++ b/src/Command/Transaction.php @@ -12,7 +12,7 @@ Transport\Frame\Channel, Transport\Frame\Method, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; final class Transaction implements Command { @@ -35,7 +35,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return $this ->select($connection, $channel) ->flatMap(fn($connection) => ($this->command)( @@ -72,29 +72,29 @@ public function with(Command $command): self } /** - * @return Either + * @return Attempt */ private function select( Connection $connection, Channel $channel, - ): Either { + ): Attempt { return $connection ->request( static fn($protocol) => $protocol->transaction()->select($channel), Method::transactionSelectOk, ) ->map(static fn() => $connection) - ->leftMap(static fn() => Failure::toSelect()); + ->mapError(Failure::as(Failure::toSelect())); } /** - * @return Either + * @return Attempt */ private function finish( Connection $connection, Channel $channel, State $state, - ): Either { + ): Attempt { return match (($this->predicate)($state->unwrap())) { true => $this->commit($connection, $channel, $state), false => $this->rollback($connection, $channel, $state), @@ -102,36 +102,36 @@ private function finish( } /** - * @return Either + * @return Attempt */ private function commit( Connection $connection, Channel $channel, State $state, - ): Either { + ): Attempt { return $connection ->request( static fn($protocol) => $protocol->transaction()->commit($channel), Method::transactionCommitOk, ) ->map(static fn() => $state) - ->leftMap(static fn() => Failure::toCommit()); + ->mapError(Failure::as(Failure::toCommit())); } /** - * @return Either + * @return Attempt */ private function rollback( Connection $connection, Channel $channel, State $state, - ): Either { + ): Attempt { return $connection ->request( static fn($protocol) => $protocol->transaction()->rollback($channel), Method::transactionRollbackOk, ) ->map(static fn() => $state) - ->leftMap(static fn() => Failure::toRollback()); + ->mapError(Failure::as(Failure::toRollback())); } } diff --git a/src/Command/Unbind.php b/src/Command/Unbind.php index 7e82087..64ec6b6 100644 --- a/src/Command/Unbind.php +++ b/src/Command/Unbind.php @@ -13,7 +13,7 @@ Transport\Frame\Method, Model\Queue\Unbinding, }; -use Innmind\Immutable\Either; +use Innmind\Immutable\Attempt; final class Unbind implements Command { @@ -30,7 +30,7 @@ public function __invoke( Channel $channel, MessageReader $read, State $state, - ): Either { + ): Attempt { return $connection ->request( fn($protocol) => $protocol->queue()->unbind( @@ -40,7 +40,7 @@ public function __invoke( Method::queueUnbindOk, ) ->map(static fn() => $state) - ->leftMap(fn() => Failure::toUnbind($this->command)); + ->mapError(Failure::as(Failure::toUnbind($this->command))); } #[\NoDiscard] diff --git a/src/Consumer/Continuation.php b/src/Consumer/Continuation.php index 8cd1453..ed7ddee 100644 --- a/src/Consumer/Continuation.php +++ b/src/Consumer/Continuation.php @@ -17,7 +17,7 @@ Exception\BasicGetNotCancellable, }; use Innmind\Immutable\{ - Either, + Attempt, SideEffect, }; @@ -73,7 +73,7 @@ public function cancel(mixed $state): self * * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ public function respond( string $queue, @@ -82,8 +82,8 @@ public function respond( MessageReader $read, int $deliveryTag, ?string $consumerTag = null, - ): Either { - /** @var Either */ + ): Attempt { + /** @var Attempt */ return match ($this->response) { State::cancel => $this ->doAck($queue, $connection, $channel, $deliveryTag) @@ -107,14 +107,14 @@ public function respond( } /** - * @return Either + * @return Attempt */ private function recover( string $queue, Connection $connection, Channel $channel, MessageReader $read, - ): Either { + ): Attempt { $received = $connection->wait(); $walkOverPrefetchedMessages = $received->match( static fn($received) => $received->is(Method::basicDeliver), @@ -142,7 +142,6 @@ private function recover( // to "consume" within the same channel will receive new messages but // won't receive previously prefetched messages leading to out of order // messages handling - /** @var Either */ return $received ->flatMap(static fn() => $connection->request( static fn($protocol) => $protocol->basic()->recover( @@ -152,75 +151,75 @@ private function recover( Method::basicRecoverOk, )) ->map(fn() => Canceled::of($this->state)) - ->leftMap(static fn() => Failure::toRecover($queue)); + ->mapError(Failure::as(Failure::toRecover($queue))); } /** * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ private function doAck( string $queue, Connection $connection, Channel $channel, int $deliveryTag, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol) => $protocol->basic()->ack( $channel, Ack::of($deliveryTag), )) - ->leftMap(static fn() => Failure::toAck($queue)); + ->mapError(Failure::as(Failure::toAck($queue))); } /** * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ private function doReject( string $queue, Connection $connection, Channel $channel, int $deliveryTag, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol) => $protocol->basic()->reject( $channel, Reject::of($deliveryTag), )) - ->leftMap(static fn() => Failure::toReject($queue)); + ->mapError(Failure::as(Failure::toReject($queue))); } /** * @param int<0, max> $deliveryTag * - * @return Either + * @return Attempt */ private function doRequeue( string $queue, Connection $connection, Channel $channel, int $deliveryTag, - ): Either { + ): Attempt { return $connection ->send(static fn($protocol) => $protocol->basic()->reject( $channel, Reject::requeue($deliveryTag), )) - ->leftMap(static fn() => Failure::toReject($queue)); + ->mapError(Failure::as(Failure::toReject($queue))); } /** - * @return Either + * @return Attempt */ private function doCancel( string $queue, Connection $connection, Channel $channel, ?string $consumerTag, - ): Either { + ): Attempt { if (\is_null($consumerTag)) { // this means the user called self::cancel when inside a Get throw new BasicGetNotCancellable; @@ -231,6 +230,6 @@ private function doCancel( $channel, Cancel::of($consumerTag), )) - ->leftMap(static fn() => Failure::toCancel($queue)); + ->mapError(Failure::as(Failure::toCancel($queue))); } } diff --git a/src/Failure.php b/src/Failure.php index d11c2f5..64f4946 100644 --- a/src/Failure.php +++ b/src/Failure.php @@ -7,165 +7,262 @@ use Innmind\Signals\Signal; use Innmind\Immutable\Maybe; -/** - * @psalm-immutable - */ -abstract class Failure +final class Failure extends Exception\RuntimeException { + private function __construct( + private object $failure, + private Failure\Kind $kind, + ?\Throwable $previous = null, + ) { + parent::__construct('', 0, $previous); + } + + /** + * @internal + * + * @return callable(\Throwable): \Throwable + */ + public static function as(self $failure): callable + { + return static fn(\Throwable $e) => new self( + $failure->failure, + $failure->kind, + $e, + ); + } + #[\NoDiscard] - final public static function toOpenConnection(): self + public static function toOpenConnection(): self { - return new Failure\ToOpenConnection; + return new self( + new Failure\ToOpenConnection, + Failure\Kind::toOpenConnection, + ); } #[\NoDiscard] - final public static function toOpenChannel(): self + public static function toOpenChannel(): self { - return new Failure\ToOpenChannel; + return new self( + new Failure\ToOpenChannel, + Failure\Kind::toOpenChannel, + ); } #[\NoDiscard] - final public static function toCloseChannel(): self + public static function toCloseChannel(): self { - return new Failure\ToCloseChannel; + return new self( + new Failure\ToCloseChannel, + Failure\Kind::toCloseChannel, + ); } #[\NoDiscard] - final public static function toCloseConnection(): self + public static function toCloseConnection(): self { - return new Failure\ToCloseConnection; + return new self( + new Failure\ToCloseConnection, + Failure\Kind::toCloseConnection, + ); } #[\NoDiscard] - final public static function toDeleteQueue(Model\Queue\Deletion $command): self + public static function toDeleteQueue(Model\Queue\Deletion $command): self { - return new Failure\ToDeleteQueue($command); + return new self( + new Failure\ToDeleteQueue($command), + Failure\Kind::toDeleteQueue, + ); } #[\NoDiscard] - final public static function toDeleteExchange(Model\Exchange\Deletion $command): self + public static function toDeleteExchange(Model\Exchange\Deletion $command): self { - return new Failure\ToDeleteExchange($command); + return new self( + new Failure\ToDeleteExchange($command), + Failure\Kind::toDeleteExchange, + ); } #[\NoDiscard] - final public static function toDeclareQueue(Model\Queue\Declaration $command): self + public static function toDeclareQueue(Model\Queue\Declaration $command): self { - return new Failure\ToDeclareQueue($command); + return new self( + new Failure\ToDeclareQueue($command), + Failure\Kind::toDeclareQueue, + ); } #[\NoDiscard] - final public static function toDeclareExchange(Model\Exchange\Declaration $command): self + public static function toDeclareExchange(Model\Exchange\Declaration $command): self { - return new Failure\ToDeclareExchange($command); + return new self( + new Failure\ToDeclareExchange($command), + Failure\Kind::toDeclareExchange, + ); } #[\NoDiscard] - final public static function toBind(Model\Queue\Binding $command): self + public static function toBind(Model\Queue\Binding $command): self { - return new Failure\ToBind($command); + return new self( + new Failure\ToBind($command), + Failure\Kind::toBind, + ); } #[\NoDiscard] - final public static function toUnbind(Model\Queue\Unbinding $command): self + public static function toUnbind(Model\Queue\Unbinding $command): self { - return new Failure\ToUnbind($command); + return new self( + new Failure\ToUnbind($command), + Failure\Kind::toUnbind, + ); } #[\NoDiscard] - final public static function toPurge(Model\Queue\Purge $command): self + public static function toPurge(Model\Queue\Purge $command): self { - return new Failure\ToPurge($command); + return new self( + new Failure\ToPurge($command), + Failure\Kind::toPurge, + ); } #[\NoDiscard] - final public static function toAdjustQos(): self + public static function toAdjustQos(): self { - return new Failure\ToAdjustQos; + return new self( + new Failure\ToAdjustQos, + Failure\Kind::toAdjustQos, + ); } #[\NoDiscard] - final public static function toPublish(Model\Basic\Publish $command): self + public static function toPublish(Model\Basic\Publish $command): self { - return new Failure\ToPublish($command); + return new self( + new Failure\ToPublish($command), + Failure\Kind::toPublish, + ); } #[\NoDiscard] - final public static function toGet(Model\Basic\Get $command): self + public static function toGet(Model\Basic\Get $command): self { - return new Failure\ToGet($command); + return new self( + new Failure\ToGet($command), + Failure\Kind::toGet, + ); } #[\NoDiscard] - final public static function toConsume(Model\Basic\Consume $command): self + public static function toConsume(Model\Basic\Consume $command): self { - return new Failure\ToConsume($command); + return new self( + new Failure\ToConsume($command), + Failure\Kind::toConsume, + ); } #[\NoDiscard] - final public static function toAck(string $queue): self + public static function toAck(string $queue): self { - return new Failure\ToAck($queue); + return new self( + new Failure\ToAck($queue), + Failure\Kind::toAck, + ); } #[\NoDiscard] - final public static function toReject(string $queue): self + public static function toReject(string $queue): self { - return new Failure\ToReject($queue); + return new self( + new Failure\ToReject($queue), + Failure\Kind::toReject, + ); } #[\NoDiscard] - final public static function toCancel(string $queue): self + public static function toCancel(string $queue): self { - return new Failure\ToCancel($queue); + return new self( + new Failure\ToCancel($queue), + Failure\Kind::toCancel, + ); } #[\NoDiscard] - final public static function toRecover(string $queue): self + public static function toRecover(string $queue): self { - return new Failure\ToRecover($queue); + return new self( + new Failure\ToRecover($queue), + Failure\Kind::toRecover, + ); } #[\NoDiscard] - final public static function toSelect(): self + public static function toSelect(): self { - return new Failure\ToSelect; + return new self( + new Failure\ToSelect, + Failure\Kind::toSelect, + ); } #[\NoDiscard] - final public static function toCommit(): self + public static function toCommit(): self { - return new Failure\ToCommit; + return new self( + new Failure\ToCommit, + Failure\Kind::toCommit, + ); } #[\NoDiscard] - final public static function toRollback(): self + public static function toRollback(): self { - return new Failure\ToRollback; + return new self( + new Failure\ToRollback, + Failure\Kind::toRollback, + ); } #[\NoDiscard] - final public static function toSendFrame(): self + public static function toSendFrame(): self { - return new Failure\ToSendFrame; + return new self( + new Failure\ToSendFrame, + Failure\Kind::toSendFrame, + ); } #[\NoDiscard] - final public static function toReadFrame(): self + public static function toReadFrame(): self { - return new Failure\ToReadFrame; + return new self( + new Failure\ToReadFrame, + Failure\Kind::toReadFrame, + ); } #[\NoDiscard] - final public static function toReadMessage(): self + public static function toReadMessage(): self { - return new Failure\ToReadMessage; + return new self( + new Failure\ToReadMessage, + Failure\Kind::toReadMessage, + ); } #[\NoDiscard] - final public static function unexpectedFrame(): self + public static function unexpectedFrame(): self { - return new Failure\UnexpectedFrame; + return new self( + new Failure\UnexpectedFrame, + Failure\Kind::unexpectedFrame, + ); } /** @@ -173,20 +270,35 @@ final public static function unexpectedFrame(): self * @param Maybe $method */ #[\NoDiscard] - final public static function closedByServer( + public static function closedByServer( string $message, int $code, Maybe $method, ): self { - return new Failure\ClosedByServer($message, $code, $method); + return new self( + new Failure\ClosedByServer($message, $code, $method), + Failure\Kind::closedByServer, + ); } #[\NoDiscard] - final public static function closedBySignal(Signal $signal): self + public static function closedBySignal(Signal $signal): self { - return new Failure\ClosedBySignal($signal); + return new self( + new Failure\ClosedBySignal($signal), + Failure\Kind::closedBySignal, + ); } #[\NoDiscard] - abstract public function kind(): Failure\Kind; + public function unwrap(): object + { + return $this->failure; + } + + #[\NoDiscard] + public function kind(): Failure\Kind + { + return $this->kind; + } } diff --git a/src/Failure/ClosedByServer.php b/src/Failure/ClosedByServer.php index 975ce79..7fa8c03 100644 --- a/src/Failure/ClosedByServer.php +++ b/src/Failure/ClosedByServer.php @@ -3,16 +3,13 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Transport\Frame\Method, -}; +use Innmind\AMQP\Transport\Frame\Method; use Innmind\Immutable\Maybe; /** * @psalm-immutable */ -final class ClosedByServer extends Failure +final class ClosedByServer { private string $message; /** @var int<0, 65535> */ @@ -33,12 +30,6 @@ public function __construct(string $message, int $code, Maybe $method) $this->method = $method; } - #[\Override] - public function kind(): Kind - { - return Kind::closedByServer; - } - #[\NoDiscard] public function message(): string { diff --git a/src/Failure/ClosedBySignal.php b/src/Failure/ClosedBySignal.php index 7f31f86..0bf97e2 100644 --- a/src/Failure/ClosedBySignal.php +++ b/src/Failure/ClosedBySignal.php @@ -3,13 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; use Innmind\Signals\Signal; /** * @psalm-immutable */ -final class ClosedBySignal extends Failure +final class ClosedBySignal { private Signal $signal; @@ -21,12 +20,6 @@ public function __construct(Signal $signal) $this->signal = $signal; } - #[\Override] - public function kind(): Kind - { - return Kind::closedBySignal; - } - #[\NoDiscard] public function signal(): Signal { diff --git a/src/Failure/ToAck.php b/src/Failure/ToAck.php index 31a39f1..ec61c19 100644 --- a/src/Failure/ToAck.php +++ b/src/Failure/ToAck.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToAck extends Failure +final class ToAck { private string $queue; @@ -20,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] - public function kind(): Kind - { - return Kind::toAck; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToAdjustQos.php b/src/Failure/ToAdjustQos.php index 9d42910..5feff22 100644 --- a/src/Failure/ToAdjustQos.php +++ b/src/Failure/ToAdjustQos.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToAdjustQos extends Failure +final class ToAdjustQos { /** * @internal @@ -16,10 +14,4 @@ final class ToAdjustQos extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toAdjustQos; - } } diff --git a/src/Failure/ToBind.php b/src/Failure/ToBind.php index 0cf4c16..334f9cf 100644 --- a/src/Failure/ToBind.php +++ b/src/Failure/ToBind.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Queue\Binding as Command, -}; +use Innmind\AMQP\Model\Queue\Binding as Command; /** * @psalm-immutable */ -final class ToBind extends Failure +final class ToBind { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toBind; - } } diff --git a/src/Failure/ToCancel.php b/src/Failure/ToCancel.php index 1ba3fe4..a6230a1 100644 --- a/src/Failure/ToCancel.php +++ b/src/Failure/ToCancel.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCancel extends Failure +final class ToCancel { private string $queue; @@ -20,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] - public function kind(): Kind - { - return Kind::toCancel; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToCloseChannel.php b/src/Failure/ToCloseChannel.php index 854d538..1e31a36 100644 --- a/src/Failure/ToCloseChannel.php +++ b/src/Failure/ToCloseChannel.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCloseChannel extends Failure +final class ToCloseChannel { /** * @internal @@ -16,10 +14,4 @@ final class ToCloseChannel extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toCloseChannel; - } } diff --git a/src/Failure/ToCloseConnection.php b/src/Failure/ToCloseConnection.php index 3056ed0..a2c1c7d 100644 --- a/src/Failure/ToCloseConnection.php +++ b/src/Failure/ToCloseConnection.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCloseConnection extends Failure +final class ToCloseConnection { /** * @internal @@ -16,10 +14,4 @@ final class ToCloseConnection extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toCloseConnection; - } } diff --git a/src/Failure/ToCommit.php b/src/Failure/ToCommit.php index 0ea5c7a..2cb20b9 100644 --- a/src/Failure/ToCommit.php +++ b/src/Failure/ToCommit.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToCommit extends Failure +final class ToCommit { /** * @internal @@ -16,10 +14,4 @@ final class ToCommit extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toCommit; - } } diff --git a/src/Failure/ToConsume.php b/src/Failure/ToConsume.php index 2757c38..a4a5a3e 100644 --- a/src/Failure/ToConsume.php +++ b/src/Failure/ToConsume.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Basic\Consume as Command, -}; +use Innmind\AMQP\Model\Basic\Consume as Command; /** * @psalm-immutable */ -final class ToConsume extends Failure +final class ToConsume { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toConsume; - } } diff --git a/src/Failure/ToDeclareExchange.php b/src/Failure/ToDeclareExchange.php index 52c2cb5..624b2c1 100644 --- a/src/Failure/ToDeclareExchange.php +++ b/src/Failure/ToDeclareExchange.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Exchange\Declaration as Command, -}; +use Innmind\AMQP\Model\Exchange\Declaration as Command; /** * @psalm-immutable */ -final class ToDeclareExchange extends Failure +final class ToDeclareExchange { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toDeclareExchange; - } } diff --git a/src/Failure/ToDeclareQueue.php b/src/Failure/ToDeclareQueue.php index b7fe4cb..7343f3c 100644 --- a/src/Failure/ToDeclareQueue.php +++ b/src/Failure/ToDeclareQueue.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Queue\Declaration as Command, -}; +use Innmind\AMQP\Model\Queue\Declaration as Command; /** * @psalm-immutable */ -final class ToDeclareQueue extends Failure +final class ToDeclareQueue { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toDeclareQueue; - } } diff --git a/src/Failure/ToDeleteExchange.php b/src/Failure/ToDeleteExchange.php index 8d4990b..2bd5e17 100644 --- a/src/Failure/ToDeleteExchange.php +++ b/src/Failure/ToDeleteExchange.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Exchange\Deletion as Command, -}; +use Innmind\AMQP\Model\Exchange\Deletion as Command; /** * @psalm-immutable */ -final class ToDeleteExchange extends Failure +final class ToDeleteExchange { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toDeleteExchange; - } } diff --git a/src/Failure/ToDeleteQueue.php b/src/Failure/ToDeleteQueue.php index 35b618c..0699eea 100644 --- a/src/Failure/ToDeleteQueue.php +++ b/src/Failure/ToDeleteQueue.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Queue\Deletion as Command, -}; +use Innmind\AMQP\Model\Queue\Deletion as Command; /** * @psalm-immutable */ -final class ToDeleteQueue extends Failure +final class ToDeleteQueue { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toDeleteQueue; - } } diff --git a/src/Failure/ToGet.php b/src/Failure/ToGet.php index 0d708f3..0042109 100644 --- a/src/Failure/ToGet.php +++ b/src/Failure/ToGet.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Basic\Get as Command, -}; +use Innmind\AMQP\Model\Basic\Get as Command; /** * @psalm-immutable */ -final class ToGet extends Failure +final class ToGet { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toGet; - } } diff --git a/src/Failure/ToOpenChannel.php b/src/Failure/ToOpenChannel.php index dfefb3b..9ae1c20 100644 --- a/src/Failure/ToOpenChannel.php +++ b/src/Failure/ToOpenChannel.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToOpenChannel extends Failure +final class ToOpenChannel { /** * @internal @@ -16,10 +14,4 @@ final class ToOpenChannel extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toOpenChannel; - } } diff --git a/src/Failure/ToOpenConnection.php b/src/Failure/ToOpenConnection.php index 8a5423c..c5ddf14 100644 --- a/src/Failure/ToOpenConnection.php +++ b/src/Failure/ToOpenConnection.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToOpenConnection extends Failure +final class ToOpenConnection { /** * @internal @@ -16,10 +14,4 @@ final class ToOpenConnection extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toOpenConnection; - } } diff --git a/src/Failure/ToPublish.php b/src/Failure/ToPublish.php index a428060..de2e15b 100644 --- a/src/Failure/ToPublish.php +++ b/src/Failure/ToPublish.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Basic\Publish as Command, -}; +use Innmind\AMQP\Model\Basic\Publish as Command; /** * @psalm-immutable */ -final class ToPublish extends Failure +final class ToPublish { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toPublish; - } } diff --git a/src/Failure/ToPurge.php b/src/Failure/ToPurge.php index 384c2e4..df062b7 100644 --- a/src/Failure/ToPurge.php +++ b/src/Failure/ToPurge.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Queue\Purge as Command, -}; +use Innmind\AMQP\Model\Queue\Purge as Command; /** * @psalm-immutable */ -final class ToPurge extends Failure +final class ToPurge { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toPurge; - } } diff --git a/src/Failure/ToReadFrame.php b/src/Failure/ToReadFrame.php index 2450975..62f61d1 100644 --- a/src/Failure/ToReadFrame.php +++ b/src/Failure/ToReadFrame.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToReadFrame extends Failure +final class ToReadFrame { /** * @internal @@ -16,10 +14,4 @@ final class ToReadFrame extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toReadFrame; - } } diff --git a/src/Failure/ToReadMessage.php b/src/Failure/ToReadMessage.php index 31561c0..cef39cb 100644 --- a/src/Failure/ToReadMessage.php +++ b/src/Failure/ToReadMessage.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToReadMessage extends Failure +final class ToReadMessage { /** * @internal @@ -16,10 +14,4 @@ final class ToReadMessage extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toReadMessage; - } } diff --git a/src/Failure/ToRecover.php b/src/Failure/ToRecover.php index f433926..5b989b1 100644 --- a/src/Failure/ToRecover.php +++ b/src/Failure/ToRecover.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToRecover extends Failure +final class ToRecover { private string $queue; @@ -20,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] - public function kind(): Kind - { - return Kind::toRecover; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToReject.php b/src/Failure/ToReject.php index 43ca45d..fe445a5 100644 --- a/src/Failure/ToReject.php +++ b/src/Failure/ToReject.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToReject extends Failure +final class ToReject { private string $queue; @@ -20,12 +18,6 @@ public function __construct(string $queue) $this->queue = $queue; } - #[\Override] - public function kind(): Kind - { - return Kind::toReject; - } - #[\NoDiscard] public function queue(): string { diff --git a/src/Failure/ToRollback.php b/src/Failure/ToRollback.php index 493c069..a0c9305 100644 --- a/src/Failure/ToRollback.php +++ b/src/Failure/ToRollback.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToRollback extends Failure +final class ToRollback { /** * @internal @@ -16,10 +14,4 @@ final class ToRollback extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toRollback; - } } diff --git a/src/Failure/ToSelect.php b/src/Failure/ToSelect.php index a6fe971..fa68a9a 100644 --- a/src/Failure/ToSelect.php +++ b/src/Failure/ToSelect.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToSelect extends Failure +final class ToSelect { /** * @internal @@ -16,10 +14,4 @@ final class ToSelect extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toSelect; - } } diff --git a/src/Failure/ToSendFrame.php b/src/Failure/ToSendFrame.php index 1a0a6d9..f3f709d 100644 --- a/src/Failure/ToSendFrame.php +++ b/src/Failure/ToSendFrame.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class ToSendFrame extends Failure +final class ToSendFrame { /** * @internal @@ -16,10 +14,4 @@ final class ToSendFrame extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::toSendFrame; - } } diff --git a/src/Failure/ToUnbind.php b/src/Failure/ToUnbind.php index f237f6c..c05aee9 100644 --- a/src/Failure/ToUnbind.php +++ b/src/Failure/ToUnbind.php @@ -3,15 +3,12 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\{ - Failure, - Model\Queue\Unbinding as Command, -}; +use Innmind\AMQP\Model\Queue\Unbinding as Command; /** * @psalm-immutable */ -final class ToUnbind extends Failure +final class ToUnbind { private Command $command; @@ -28,10 +25,4 @@ public function command(): Command { return $this->command; } - - #[\Override] - public function kind(): Kind - { - return Kind::toUnbind; - } } diff --git a/src/Failure/UnexpectedFrame.php b/src/Failure/UnexpectedFrame.php index bc57cfc..45f9caf 100644 --- a/src/Failure/UnexpectedFrame.php +++ b/src/Failure/UnexpectedFrame.php @@ -3,12 +3,10 @@ namespace Innmind\AMQP\Failure; -use Innmind\AMQP\Failure; - /** * @psalm-immutable */ -final class UnexpectedFrame extends Failure +final class UnexpectedFrame { /** * @internal @@ -16,10 +14,4 @@ final class UnexpectedFrame extends Failure public function __construct() { } - - #[\Override] - public function kind(): Kind - { - return Kind::unexpectedFrame; - } } diff --git a/src/Model/Connection/MaxChannels.php b/src/Model/Connection/MaxChannels.php index 86264b0..0d41b4f 100644 --- a/src/Model/Connection/MaxChannels.php +++ b/src/Model/Connection/MaxChannels.php @@ -4,6 +4,10 @@ namespace Innmind\AMQP\Model\Connection; use Innmind\AMQP\Exception\FrameChannelExceedAllowedChannelNumber; +use Innmind\Immutable\{ + Attempt, + SideEffect, +}; /** * @psalm-immutable @@ -52,14 +56,16 @@ public function allows(int $channel): bool } /** - * @throws FrameChannelExceedAllowedChannelNumber + * @return Attempt */ #[\NoDiscard] - public function verify(int $channel): void + public function verify(int $channel): Attempt { if (!$this->allows($channel)) { - throw new FrameChannelExceedAllowedChannelNumber($channel, $this); + return Attempt::error(new FrameChannelExceedAllowedChannelNumber($channel, $this)); } + + return Attempt::result(SideEffect::identity()); } /** diff --git a/src/Model/Connection/MaxFrameSize.php b/src/Model/Connection/MaxFrameSize.php index e7958f7..970f293 100644 --- a/src/Model/Connection/MaxFrameSize.php +++ b/src/Model/Connection/MaxFrameSize.php @@ -10,6 +10,8 @@ use Innmind\Immutable\{ Sequence, Str, + Attempt, + SideEffect, }; /** @@ -69,14 +71,16 @@ public function allows(int $size): bool } /** - * @throws FrameExceedAllowedSize + * @return Attempt */ #[\NoDiscard] - public function verify(int $size): void + public function verify(int $size): Attempt { if (!$this->allows($size)) { - throw new FrameExceedAllowedSize($size, $this); + return Attempt::error(new FrameExceedAllowedSize($size, $this)); } + + return Attempt::result(SideEffect::identity()); } /** diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php index 12673b9..8d868f4 100644 --- a/src/Transport/Connection.php +++ b/src/Transport/Connection.php @@ -19,8 +19,6 @@ Model\Connection\MaxFrameSize, Model\Connection\TuneOk, Failure, - Exception\FrameChannelExceedAllowedChannelNumber, - Exception\FrameExceedAllowedSize, }; use Innmind\OperatingSystem\CurrentProcess\Signals; use Innmind\IO\{ @@ -36,8 +34,8 @@ use Innmind\OperatingSystem\Remote; use Innmind\Immutable\{ Str, + Attempt, Maybe, - Either, Sequence, SideEffect, Predicate\Instance, @@ -80,7 +78,7 @@ private function __construct( } /** - * @return Maybe + * @return Attempt */ public static function open( Transport $transport, @@ -89,7 +87,7 @@ public static function open( Period $timeout, Clock $clock, Remote $remote, - ): Maybe { + ): Attempt { return $remote ->socket( $transport, @@ -114,7 +112,6 @@ public static function open( (new FrameReader)($protocol), SignalListener::uninstalled(), )) - ->maybe() ->flatMap(new Start($server->authority())) ->flatMap(new Handshake($server->authority())) ->flatMap(new OpenVHost($server->path())); @@ -123,9 +120,9 @@ public static function open( /** * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @return Either + * @return Attempt */ - public function respondTo(Method $method, callable $frames): Either + public function respondTo(Method $method, callable $frames): Attempt { return $this ->wait($method) @@ -137,9 +134,9 @@ public function respondTo(Method $method, callable $frames): Either /** * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @return Either + * @return Attempt */ - public function request(callable $frames, Method $method, Method ...$methods): Either + public function request(callable $frames, Method $method, Method ...$methods): Attempt { return $this ->sendFrames($frames) @@ -150,17 +147,17 @@ public function request(callable $frames, Method $method, Method ...$methods): E /** * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @return Either + * @return Attempt */ - public function send(callable $frames): Either + public function send(callable $frames): Attempt { return $this->sendFrames($frames); } /** - * @return Either + * @return Attempt */ - public function wait(Method ...$names): Either + public function wait(Method ...$names): Attempt { return $this ->socket @@ -175,7 +172,6 @@ public function wait(Method ...$names): Either ->one() ->map(ReceivedFrame::of(...)) ->map($this->flagActive(...)) - ->either() ->eitherWay( fn($received) => match ($received->frame()->type()) { Type::heartbeat => $this->wait(...$names), @@ -183,15 +179,15 @@ public function wait(Method ...$names): Either }, fn() => $this->signals->close( $this, - static fn() => Either::left(Failure::toReadFrame()), + static fn() => Attempt::error(Failure::toReadFrame()), ), ); } /** - * @return Maybe + * @return Attempt */ - public function close(): Maybe + public function close(): Attempt { $this->signals->uninstall(); @@ -200,19 +196,18 @@ public function close(): Maybe static fn($protocol) => $protocol->connection()->close(Close::demand()), Method::connectionCloseOk, ) - ->maybe() - ->flatMap(fn() => $this->socket->close()->maybe()) + ->flatMap(fn() => $this->socket->close()) ->map(static fn() => new SideEffect); } /** - * @return Maybe + * @return Attempt */ public function tune( MaxChannels $maxChannels, MaxFrameSize $maxFrameSize, Period $heartbeat, - ): Maybe { + ): Attempt { return $this ->send(static fn($protocol) => $protocol->connection()->tuneOk( TuneOk::of( @@ -221,7 +216,6 @@ public function tune( $heartbeat, ), )) - ->maybe() ->map(fn() => new self( $this->protocol, $this->heartbeat->adjust($heartbeat), @@ -254,68 +248,62 @@ private function flagActive(ReceivedFrame $received): ReceivedFrame * * @param callable(Protocol, MaxFrameSize): Sequence $frames * - * @throws FrameChannelExceedAllowedChannelNumber - * @throws FrameExceedAllowedSize - * - * @return Either + * @return Attempt */ - private function sendFrames(callable $frames): Either + private function sendFrames(callable $frames): Attempt { - $data = $frames($this->protocol, $this->maxFrameSize) - ->map(function($frame) { - $this->maxChannels->verify($frame->channel()->toInt()); - - return $frame; - }) - ->map(static fn($frame) => $frame->pack()) - ->map(function($frame) { - $this->maxFrameSize->verify($frame->length()); - - return $frame; - }); + $data = $frames($this->protocol, $this->maxFrameSize)->map( + fn($frame) => $this + ->maxChannels + ->verify($frame->channel()->toInt()) + ->map(static fn() => $frame->pack()) + ->flatMap( + fn($frame) => $this + ->maxFrameSize + ->verify($frame->length()) + ->map(static fn() => $frame), + ), + ); return $this ->socket ->abortWhen($this->signals->notified(...)) - ->sink($data) - ->either() + ->sinkAttempts($data) ->eitherWay( - static fn() => Either::right(new SideEffect), + static fn() => Attempt::result(new SideEffect), fn() => $this->signals->close( $this, - static fn() => Either::left(Failure::toSendFrame()), + static fn() => Attempt::error(Failure::toSendFrame()), ), ); } /** - * @return Either + * @return Attempt */ private function ensureValidFrame( ReceivedFrame $received, Method ...$names, - ): Either { + ): Attempt { if (\count($names) === 0) { - /** @var Either */ - return Either::right($received); + return Attempt::result($received); } if ($received->frame()->type() !== Type::method) { // someone must have forgot a wait() call - /** @var Either */ - return Either::left(Failure::unexpectedFrame()); + /** @var Attempt */ + return Attempt::error(Failure::unexpectedFrame()); } if ($received->oneOf(...$names)) { - /** @var Either */ - return Either::right($received); + return Attempt::result($received); } if ($received->is(Method::connectionClose)) { - /** @var Either */ + /** @var Attempt */ return $this ->send(static fn($protocol) => $protocol->connection()->closeOk()) - ->leftMap(static fn() => Failure::toCloseConnection()) + ->mapError(Failure::as(Failure::toCloseConnection())) ->flatMap(static function() use ($received) { $message = $received ->frame() @@ -355,11 +343,11 @@ private function ensureValidFrame( static fn(int $class, int $method) => Method::of($class, $method), ); - return Either::left(Failure::closedByServer($message, $code, $method)); + return Attempt::error(Failure::closedByServer($message, $code, $method)); }); } - /** @var Either */ - return Either::left(Failure::unexpectedFrame()); + /** @var Attempt */ + return Attempt::error(Failure::unexpectedFrame()); } } diff --git a/src/Transport/Connection/Handshake.php b/src/Transport/Connection/Handshake.php index 7c654c1..d2b1d99 100644 --- a/src/Transport/Connection/Handshake.php +++ b/src/Transport/Connection/Handshake.php @@ -16,8 +16,8 @@ use Innmind\TimeContinuum\Period; use Innmind\Url\Authority; use Innmind\Immutable\{ + Attempt, Maybe, - Either, Predicate\Instance, }; @@ -34,23 +34,22 @@ public function __construct(Authority $authority) } /** - * @return Maybe + * @return Attempt */ - public function __invoke(Connection $connection): Maybe + public function __invoke(Connection $connection): Attempt { return $connection ->wait(Method::connectionSecure, Method::connectionTune) ->flatMap(fn($received) => match ($received->is(Method::connectionSecure)) { true => $this->secure($connection), false => $this->maybeTune($connection, $received->frame()), - }) - ->maybe(); + }); } /** - * @return Either + * @return Attempt */ - private function secure(Connection $connection): Either + private function secure(Connection $connection): Attempt { return $connection ->request( @@ -66,9 +65,9 @@ private function secure(Connection $connection): Either } /** - * @return Either + * @return Attempt */ - private function maybeTune(Connection $connection, Frame $frame): Either + private function maybeTune(Connection $connection, Frame $frame): Attempt { $maxChannels = $frame ->values() @@ -90,8 +89,15 @@ private function maybeTune(Connection $connection, Frame $frame): Either ->map(Period::millisecond(...)); return Maybe::all($maxChannels, $maxFrameSize, $heartbeat) - ->flatMap($connection->tune(...)) - ->either() - ->leftMap(static fn() => Failure::toOpenConnection()); + ->flatMap( + static fn( + MaxChannels $maxChannels, + MaxFrameSize $maxFrameSize, + Period $heartbeat, + ) => $connection + ->tune($maxChannels, $maxFrameSize, $heartbeat) + ->maybe(), + ) + ->attempt(static fn() => Failure::toOpenConnection()); } } diff --git a/src/Transport/Connection/MessageReader.php b/src/Transport/Connection/MessageReader.php index f9eb361..a18d3c5 100644 --- a/src/Transport/Connection/MessageReader.php +++ b/src/Transport/Connection/MessageReader.php @@ -30,7 +30,7 @@ Predicate\Instance, Sequence, Maybe, - Either, + Attempt, }; /** @@ -46,9 +46,9 @@ private function __construct(Filesystem $filesystem) } /** - * @return Either + * @return Attempt */ - public function __invoke(Connection $connection): Either + public function __invoke(Connection $connection): Attempt { return $connection ->wait() @@ -64,19 +64,18 @@ public static function of(Filesystem $filesystem): self } /** - * @return Either + * @return Attempt */ private function decode( Connection $connection, Frame $header, - ): Either { + ): Attempt { return $header ->values() ->first() ->keep(Instance::of(Value\UnsignedLongLongInteger::class)) ->map(static fn($value) => $value->original()) - ->either() - ->leftMap(static fn() => Failure::toReadMessage()) + ->attempt(static fn() => Failure::toReadMessage()) ->flatMap(fn($bodySize) => $this->readMessage($connection, $bodySize)) ->flatMap( fn($message) => $header @@ -84,8 +83,7 @@ private function decode( ->get(1) ->keep(Instance::of(Value\UnsignedShortInteger::class)) ->map(static fn($value) => $value->original()) - ->either() - ->leftMap(static fn() => Failure::toReadMessage()) + ->attempt(static fn() => Failure::toReadMessage()) ->flatMap(fn($flagBits) => $this->addProperties( $message, $flagBits, @@ -99,13 +97,13 @@ private function decode( /** * @param Sequence $properties * - * @return Either + * @return Attempt */ private function addProperties( Message $message, int $flagBits, Sequence $properties, - ): Either { + ): Attempt { /** @var Sequence, Message): Maybe}> */ $toParse = Sequence::of( [ @@ -228,30 +226,26 @@ private function addProperties( * @psalm-suppress MixedArrayAccess * @psalm-suppress MixedArgument * @psalm-suppress MixedMethodCall - * @var Either */ return $toParse ->filter(static fn($pair) => (bool) ($flagBits & (1 << $pair[0]))) ->map(static fn($pair) => $pair[1]) - ->reduce( - Maybe::just([$properties, $message]), - static fn(Maybe $state, $parse): Maybe => $state->flatMap( - static fn($state) => $parse($state[0]->first(), $state[1]) - ->map(static fn($message) => [$state[0]->drop(1), $message]), - ), + ->sink([$properties, $message]) + ->maybe( + static fn($state, $parse) => $parse($state[0]->first(), $state[1]) + ->map(static fn($message) => [$state[0]->drop(1), $message]), ) ->map(static fn($state) => $state[1]) - ->either() - ->leftMap(static fn() => Failure::toReadMessage()); + ->attempt(static fn() => Failure::toReadMessage()); } /** - * @return Either + * @return Attempt */ private function readMessage( Connection $connection, int $bodySize, - ): Either { + ): Attempt { $chunks = Sequence::lazy(static function() use ($connection, $bodySize) { $continue = $bodySize !== 0; $read = 0; @@ -290,8 +284,7 @@ private function readMessage( }; return $content - ->either() ->map(Message::file(...)) - ->leftMap(static fn() => Failure::toReadMessage()); + ->mapError(Failure::as(Failure::toReadMessage())); } } diff --git a/src/Transport/Connection/OpenVHost.php b/src/Transport/Connection/OpenVHost.php index 6f5c194..b6285fa 100644 --- a/src/Transport/Connection/OpenVHost.php +++ b/src/Transport/Connection/OpenVHost.php @@ -9,7 +9,7 @@ Model\Connection\Open, }; use Innmind\Url\Path; -use Innmind\Immutable\Maybe; +use Innmind\Immutable\Attempt; /** * @internal @@ -24,9 +24,9 @@ public function __construct(Path $vhost) } /** - * @return Maybe + * @return Attempt */ - public function __invoke(Connection $connection): Maybe + public function __invoke(Connection $connection): Attempt { return $connection ->request( @@ -35,7 +35,6 @@ public function __invoke(Connection $connection): Maybe ), Method::connectionOpenOk, ) - ->maybe() ->map(static fn() => $connection); } } diff --git a/src/Transport/Connection/SignalListener.php b/src/Transport/Connection/SignalListener.php index 861bef4..598c34d 100644 --- a/src/Transport/Connection/SignalListener.php +++ b/src/Transport/Connection/SignalListener.php @@ -13,7 +13,7 @@ use Innmind\OperatingSystem\CurrentProcess\Signals; use Innmind\Signals\Signal; use Innmind\Immutable\{ - Either, + Attempt, Maybe, }; @@ -100,16 +100,15 @@ public function notified(): bool /** * @template T * - * @param callable(): Either $continue + * @param callable(): Attempt $continue * - * @return Either + * @return Attempt */ - public function close(Connection $connection, callable $continue): Either + public function close(Connection $connection, callable $continue): Attempt { return Maybe::all($this->received, $this->channel) ->map(static fn(Signal $signal, Channel $channel) => [$signal, $channel]) ->filter(fn() => !$this->closing) - ->either() ->match( function($in) use ($connection) { $this->closing = true; @@ -123,14 +122,13 @@ function($in) use ($connection) { ), Method::channelCloseOk, ) - ->leftMap(static fn() => Failure::toCloseChannel()) + ->mapError(Failure::as(Failure::toCloseChannel())) ->flatMap( static fn() => $connection ->close() - ->either() - ->leftMap(static fn() => Failure::toCloseConnection()), + ->mapError(Failure::as(Failure::toCloseConnection())), ) - ->flatMap(static fn() => Either::left(Failure::closedBySignal($signal))); + ->flatMap(static fn() => Attempt::error(Failure::closedBySignal($signal))); }, static fn() => $continue(), ); diff --git a/src/Transport/Connection/Start.php b/src/Transport/Connection/Start.php index 67a5ae0..7625c4e 100644 --- a/src/Transport/Connection/Start.php +++ b/src/Transport/Connection/Start.php @@ -9,7 +9,7 @@ Model\Connection\StartOk, }; use Innmind\Url\Authority; -use Innmind\Immutable\Maybe; +use Innmind\Immutable\Attempt; /** * @internal @@ -24,9 +24,9 @@ public function __construct(Authority $authority) } /** - * @return Maybe + * @return Attempt */ - public function __invoke(Connection $connection): Maybe + public function __invoke(Connection $connection): Attempt { // at this point the server could respond with a simple text "AMQP0xyz" // where xyz represent the version of the protocol it supports meaning @@ -43,7 +43,6 @@ public function __invoke(Connection $connection): Maybe ), ), ) - ->maybe() ->map(static fn() => $connection); } } diff --git a/tests/Model/Connection/MaxChannelsTest.php b/tests/Model/Connection/MaxChannelsTest.php index 45c791c..a91f29a 100644 --- a/tests/Model/Connection/MaxChannelsTest.php +++ b/tests/Model/Connection/MaxChannelsTest.php @@ -7,6 +7,7 @@ Model\Connection\MaxChannels, Exception\FrameChannelExceedAllowedChannelNumber, }; +use Innmind\Immutable\SideEffect; use Innmind\BlackBox\{ PHPUnit\BlackBox, PHPUnit\Framework\TestCase, @@ -86,7 +87,10 @@ public function testVerifyAllowedNumbers() ->then(function($size) { $max = MaxChannels::of(0); - $this->assertNull($max->verify($size)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($size)->unwrap(), + ); }); $this ->forAll( @@ -96,7 +100,10 @@ public function testVerifyAllowedNumbers() ->then(function($allowed, $numberBelow) { $max = MaxChannels::of($allowed); - $this->assertNull($max->verify($allowed - $numberBelow)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($allowed - $numberBelow)->unwrap(), + ); }); } @@ -115,7 +122,7 @@ public function testThrowWhenVerifyingNumberAboveMaxAllowed(): BlackBox\Proof $above = $allowed + $extraNumber; try { - $max->verify($above); + $max->verify($above)->unwrap(); $this->fail('it should throw'); } catch (FrameChannelExceedAllowedChannelNumber $e) { $this->assertSame( diff --git a/tests/Model/Connection/MaxFrameSizeTest.php b/tests/Model/Connection/MaxFrameSizeTest.php index 85351d3..fe82319 100644 --- a/tests/Model/Connection/MaxFrameSizeTest.php +++ b/tests/Model/Connection/MaxFrameSizeTest.php @@ -7,6 +7,7 @@ Model\Connection\MaxFrameSize, Exception\FrameExceedAllowedSize, }; +use Innmind\Immutable\SideEffect; use Innmind\BlackBox\{ PHPUnit\BlackBox, PHPUnit\Framework\TestCase, @@ -86,7 +87,10 @@ public function testVerifyAllowedSizes() ->then(function($size) { $max = MaxFrameSize::of(0); - $this->assertNull($max->verify($size)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($size)->unwrap(), + ); }); $this ->forAll( @@ -96,7 +100,10 @@ public function testVerifyAllowedSizes() ->then(function($allowed, $sizeBelow) { $max = MaxFrameSize::of($allowed); - $this->assertNull($max->verify($allowed - $sizeBelow)); + $this->assertInstanceOf( + SideEffect::class, + $max->verify($allowed - $sizeBelow)->unwrap(), + ); }); } @@ -117,7 +124,7 @@ public function testThrowWhenVerifyingSizeAboveMaxAllowed(): BlackBox\Proof $this->expectException(FrameExceedAllowedSize::class); $this->expectExceptionMessage("Max frame size can be $allowed but got $above"); - $max->verify($above); + $max->verify($above)->unwrap(); }); }