Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

### Changed

- Requires `innmind/foundation:~1.5`
- Requires `innmind/foundation:~1.7`
- `Innmind\AMQP\Factory::make()` timeout argument is now expressed via `Innmind\TimeContinuum\Period`
- `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period`
- `Innmind\AMQP\Failure` is now an exception that wraps each possible failure object
- `Innmind\AMQP\Client::run()` now returns an `Innmind\Immutable\Attempt`
- `Innmind\AMQP\Command::__invoke()` now must return an `Innmind\Immutable\Attempt`

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
},
"require": {
"php": "~8.2",
"innmind/foundation": "~1.5",
"innmind/foundation": "^1.7.1",
"ramsey/uuid": "~4.0"
},
"autoload": {
Expand Down
34 changes: 15 additions & 19 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Filesystem,
};
use Innmind\Immutable\{
Either,
Attempt,
Maybe,
SideEffect,
};
Expand All @@ -24,15 +24,15 @@ final class Client
{
/** @var Maybe<Command> */
private Maybe $command;
/** @var callable(): Maybe<Connection> */
/** @var callable(): Attempt<Connection> */
private $load;
private Filesystem $filesystem;
/** @var Maybe<CurrentProcess> */
private Maybe $signals;

/**
* @param Maybe<Command> $command
* @param callable(): Maybe<Connection> $load
* @param callable(): Attempt<Connection> $load
* @param Maybe<CurrentProcess> $signals
*/
private function __construct(
Expand All @@ -48,7 +48,7 @@ private function __construct(
}

/**
* @param callable(): Maybe<Connection> $load
* @param callable(): Attempt<Connection> $load
*/
#[\NoDiscard]
public static function of(callable $load, Filesystem $filesystem): self
Expand Down Expand Up @@ -96,10 +96,10 @@ public function listenSignals(CurrentProcess $currentProcess): self
*
* @param T $state
*
* @return Either<Failure, T>
* @return Attempt<T>
*/
#[\NoDiscard]
public function run(mixed $state): Either
public function run(mixed $state): Attempt
{
return $this->command->match(
fn($command) => $this
Expand All @@ -114,24 +114,22 @@ public function run(mixed $state): Either
->map(static fn(): mixed => $state->unwrap()),
);
}),
static fn() => Either::right($state),
static fn() => Attempt::result($state),
);
}

/**
* @return Either<Failure, array{Connection, Channel}>
* @return Attempt<array{Connection, Channel}>
*/
private function openChannel(): Either
private function openChannel(): Attempt
{
// Since the connection is never shared between objects then there is no
// need to have a dynamic channel number as there will ALWAYS be one
// channel per connection
$channel = new Channel(1);

/** @var Either<Failure, array{Connection, Channel}> */
return ($this->load)()
->either()
->leftMap(static fn() => Failure::toOpenConnection())
->mapError(Failure::as(Failure::toOpenConnection()))
->flatMap(
fn($connection) => $connection
->request(
Expand All @@ -146,16 +144,15 @@ private function openChannel(): Either
static fn() => null,
))
->map(static fn() => [$connection, $channel])
->leftMap(static fn() => Failure::toOpenChannel()),
->mapError(Failure::as(Failure::toOpenChannel())),
);
}

/**
* @return Either<Failure, SideEffect>
* @return Attempt<SideEffect>
*/
private function close(Connection $connection, Channel $channel): Either
private function close(Connection $connection, Channel $channel): Attempt
{
/** @var Either<Failure, SideEffect> */
return $connection
->request(
static fn($protocol) => $protocol->channel()->close(
Expand All @@ -164,12 +161,11 @@ private function close(Connection $connection, Channel $channel): Either
),
Method::channelCloseOk,
)
->leftMap(static fn() => Failure::toCloseChannel())
->mapError(Failure::as(Failure::toCloseChannel()))
->flatMap(
static fn() => $connection
->close()
->either()
->leftMap(static fn() => Failure::toCloseConnection()),
->mapError(Failure::as(Failure::toCloseConnection())),
);
}
}
6 changes: 3 additions & 3 deletions src/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
Frame\Channel,
Connection\MessageReader,
};
use Innmind\Immutable\Either;
use Innmind\Immutable\Attempt;

interface Command
{
/**
* @return Either<Failure, Client\State>
* @return Attempt<Client\State>
*/
#[\NoDiscard]
public function __invoke(
Connection $connection,
Channel $channel,
MessageReader $read,
Client\State $state,
): Either;
): Attempt;
}
6 changes: 3 additions & 3 deletions src/Command/Bind.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Model\Queue\Binding,
};
use Innmind\Immutable\{
Either,
Attempt,
Sequence,
};

Expand All @@ -34,7 +34,7 @@ public function __invoke(
Channel $channel,
MessageReader $read,
State $state,
): Either {
): Attempt {
$frames = fn(Protocol $protocol): Sequence => $protocol->queue()->bind(
$channel,
$this->command,
Expand All @@ -47,7 +47,7 @@ public function __invoke(

return $sideEffect
->map(static fn() => $state)
->leftMap(fn() => Failure::toBind($this->command));
->mapError(Failure::as(Failure::toBind($this->command)));
}

#[\NoDiscard]
Expand Down
40 changes: 19 additions & 21 deletions src/Command/Consume.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
};
use Innmind\Immutable\{
Maybe,
Either,
Attempt,
Sequence,
Predicate\Instance,
};
Expand All @@ -48,7 +48,7 @@ public function __invoke(
Channel $channel,
MessageReader $read,
State $state,
): Either {
): Attempt {
$frames = fn(Protocol $protocol): Sequence => $protocol->basic()->consume(
$channel,
$this->command,
Expand All @@ -69,8 +69,8 @@ public function __invoke(
->map(static fn() => $state),
};

return $sideEffect->leftMap(
fn() => Failure::toConsume($this->command),
return $sideEffect->mapError(
Failure::as(Failure::toConsume($this->command)),
);
}

Expand All @@ -93,22 +93,22 @@ public function handle(callable $consume): self
}

/**
* @return Either<Failure, State>
* @return Attempt<State>
*/
private function maybeStart(
Connection $connection,
Channel $channel,
MessageReader $read,
Frame $frame,
State $state,
): Either {
): Attempt {
return $frame
->values()
->first()
->keep(Instance::of(Value\ShortString::class))
->map(static fn($value) => $value->original()->toString())
->either()
->leftMap(fn() => Failure::toConsume($this->command))
->attempt(fn() => Failure::toConsume($this->command))
->flatMap(fn($consumerTag) => $this->start(
$connection,
$channel,
Expand All @@ -119,17 +119,17 @@ private function maybeStart(
}

/**
* @return Either<Failure, State>
* @return Attempt<State>
*/
private function start(
Connection $connection,
Channel $channel,
MessageReader $read,
State $state,
string $consumerTag,
): Either {
/** @var Either<Failure, State|Canceled> */
$consumed = Either::right($state);
): Attempt {
/** @var Attempt<State|Canceled> */
$consumed = Attempt::result($state);
// here the best approach would be to use recursion to avoid unwrapping
// the monads but it would end up with a too deep call stack for inifite
// consumers as each new message would mean a new function call in the
Expand Down Expand Up @@ -160,16 +160,15 @@ private function start(
}

/**
* @return Either<Failure, State|Canceled>
* @return Attempt<State|Canceled>
*/
private function waitDeliver(
Connection $connection,
Channel $channel,
State $state,
string $consumerTag,
MessageReader $read,
): Either {
/** @var Either<Failure, State|Canceled> */
): Attempt {
return $connection
->wait(Method::basicDeliver)
->flatMap(
Expand All @@ -185,11 +184,11 @@ private function waitDeliver(
),
),
)
->leftMap(fn() => Failure::toConsume($this->command));
->mapError(Failure::as(Failure::toConsume($this->command)));
}

/**
* @return Either<Failure, State|Canceled>
* @return Attempt<State|Canceled>
*/
private function maybeConsume(
Connection $connection,
Expand All @@ -199,7 +198,7 @@ private function maybeConsume(
string $consumerTag,
Frame $frame,
Message $message,
): Either {
): Attempt {
$destinationConsumerTag = $frame
->values()
->first()
Expand Down Expand Up @@ -232,8 +231,7 @@ private function maybeConsume(
->flatMap(static fn($details) => $destinationConsumerTag->map(
static fn() => $details, // this manipulation is to make sure the consumerTag is indeed for this consumer
))
->either()
->leftMap(fn() => Failure::toConsume($this->command))
->attempt(fn() => Failure::toConsume($this->command))
->flatMap(fn($details) => $this->consume(
$connection,
$channel,
Expand All @@ -246,7 +244,7 @@ private function maybeConsume(
}

/**
* @return Either<Failure, State|Canceled>
* @return Attempt<State|Canceled>
*/
private function consume(
Connection $connection,
Expand All @@ -256,7 +254,7 @@ private function consume(
Details $details,
Message $message,
string $consumerTag,
): Either {
): Attempt {
return ($this->consume)(
$state->unwrap(),
$message,
Expand Down
6 changes: 3 additions & 3 deletions src/Command/DeclareExchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Failure,
};
use Innmind\Immutable\{
Either,
Attempt,
Sequence,
};

Expand All @@ -35,7 +35,7 @@ public function __invoke(
Channel $channel,
MessageReader $read,
State $state,
): Either {
): Attempt {
$frames = fn(Protocol $protocol): Sequence => $protocol->exchange()->declare(
$channel,
$this->command,
Expand All @@ -48,7 +48,7 @@ public function __invoke(

return $sideEffect
->map(static fn() => $state)
->leftMap(fn() => Failure::toDeclareExchange($this->command));
->mapError(Failure::as(Failure::toDeclareExchange($this->command)));
}

#[\NoDiscard]
Expand Down
8 changes: 4 additions & 4 deletions src/Command/DeclareQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
};
use Innmind\Immutable\{
Maybe,
Either,
Attempt,
Sequence,
Predicate\Instance,
};
Expand All @@ -39,7 +39,7 @@ public function __invoke(
Channel $channel,
MessageReader $read,
State $state,
): Either {
): Attempt {
$frames = fn(Protocol $protocol): Sequence => $protocol->queue()->declare(
$channel,
$this->command,
Expand Down Expand Up @@ -71,14 +71,14 @@ public function __invoke(
// maybe in the future we could expose this info to the user
return Maybe::all($name, $message, $consumer)
->map(DeclareOk::of(...))
->either();
->attempt(static fn() => new \RuntimeException('Invalid declare.ok response'));
}),
false => $connection->send($frames),
};

return $sideEffect
->map(static fn() => $state)
->leftMap(fn() => Failure::toDeclareQueue($this->command));
->mapError(Failure::as(Failure::toDeclareQueue($this->command)));
}

#[\NoDiscard]
Expand Down
Loading