Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Changed

- Requires `innmind/foundation:~1.5`
- `Innmind\AMQP\Factory::make()` timeout argument is now expressed via `Innmind\TimeContinuum\Period`
- `Innmind\AMQP\Model\Basic\Message` expiration is now expressed via `Innmind\TimeContinuum\Period`

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion benchmark/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
->make(
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
Period::second(1)->asElapsedPeriod(),
Period::second(1),
)
->listenSignals($os->process());
2 changes: 1 addition & 1 deletion fixtures/forever-consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
->make(
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
Period::second(1)->asElapsedPeriod(),
Period::second(1),
)
->listenSignals($os->process())
->with(DeclareQueue::of('always-empty'))
Expand Down
4 changes: 2 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use Innmind\OperatingSystem\OperatingSystem;
use Innmind\IO\Sockets\Internet\Transport as Socket;
use Innmind\Url\Url;
use Innmind\TimeContinuum\ElapsedPeriod;
use Innmind\TimeContinuum\Period;

final class Factory
{
Expand All @@ -27,7 +27,7 @@ public static function of(OperatingSystem $os): self
public function make(
Socket $transport,
Url $server,
ElapsedPeriod $timeout,
Period $timeout,
): Client {
return Client::of(
fn() => Transport\Connection::open(
Expand Down
10 changes: 5 additions & 5 deletions src/Model/Basic/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
};
use Innmind\TimeContinuum\{
PointInTime,
ElapsedPeriod,
Period,
};
use Innmind\Filesystem\File\Content;
use Innmind\Immutable\{
Expand Down Expand Up @@ -47,7 +47,7 @@ final class Message
private Maybe $correlationId;
/** @var Maybe<ReplyTo> */
private Maybe $replyTo;
/** @var Maybe<ElapsedPeriod> */
/** @var Maybe<Period> */
private Maybe $expiration;
/** @var Maybe<Id> */
private Maybe $id;
Expand Down Expand Up @@ -88,7 +88,7 @@ private function __construct(Sequence $chunks, int $length)
$this->correlationId = Maybe::nothing();
/** @var Maybe<ReplyTo> */
$this->replyTo = Maybe::nothing();
/** @var Maybe<ElapsedPeriod> */
/** @var Maybe<Period> */
$this->expiration = Maybe::nothing();
/** @var Maybe<Id> */
$this->id = Maybe::nothing();
Expand Down Expand Up @@ -268,7 +268,7 @@ public function withReplyTo(ReplyTo $replyTo): self
}

/**
* @return Maybe<ElapsedPeriod>
* @return Maybe<Period>
*/
#[\NoDiscard]
public function expiration(): Maybe
Expand All @@ -277,7 +277,7 @@ public function expiration(): Maybe
}

#[\NoDiscard]
public function withExpiration(ElapsedPeriod $expiration): self
public function withExpiration(Period $expiration): self
{
$self = clone $this;
$self->expiration = Maybe::just($expiration);
Expand Down
10 changes: 5 additions & 5 deletions src/Model/Connection/TuneOk.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Innmind\AMQP\Model\Connection;

use Innmind\TimeContinuum\ElapsedPeriod;
use Innmind\TimeContinuum\Period;

/**
* @psalm-immutable
Expand All @@ -12,12 +12,12 @@ final class TuneOk
{
private MaxChannels $maxChannels;
private MaxFrameSize $maxFrameSize;
private ElapsedPeriod $heartbeat;
private Period $heartbeat;

private function __construct(
MaxChannels $maxChannels,
MaxFrameSize $maxFrameSize,
ElapsedPeriod $heartbeat,
Period $heartbeat,
) {
$this->maxChannels = $maxChannels;
$this->maxFrameSize = $maxFrameSize;
Expand All @@ -31,7 +31,7 @@ private function __construct(
public static function of(
MaxChannels $maxChannels,
MaxFrameSize $maxFrameSize,
ElapsedPeriod $heartbeat,
Period $heartbeat,
): self {
return new self($maxChannels, $maxFrameSize, $heartbeat);
}
Expand All @@ -55,7 +55,7 @@ public function maxFrameSize(): int
}

#[\NoDiscard]
public function heartbeat(): ElapsedPeriod
public function heartbeat(): Period
{
return $this->heartbeat;
}
Expand Down
10 changes: 5 additions & 5 deletions src/Transport/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
};
use Innmind\Url\Url;
use Innmind\TimeContinuum\{
ElapsedPeriod,
Period,
Clock,
};
use Innmind\OperatingSystem\Remote;
Expand Down Expand Up @@ -86,7 +86,7 @@ public static function open(
Transport $transport,
Url $server,
Protocol $protocol,
ElapsedPeriod $timeout,
Period $timeout,
Clock $clock,
Remote $remote,
): Maybe {
Expand All @@ -97,7 +97,7 @@ public static function open(
)
->map(
static fn($socket) => $socket
->timeoutAfter($timeout->asPeriod())
->timeoutAfter($timeout)
->toEncoding(Str\Encoding::ascii),
)
->flatMap(
Expand Down Expand Up @@ -211,7 +211,7 @@ public function close(): Maybe
public function tune(
MaxChannels $maxChannels,
MaxFrameSize $maxFrameSize,
ElapsedPeriod $heartbeat,
Period $heartbeat,
): Maybe {
return $this
->send(static fn($protocol) => $protocol->connection()->tuneOk(
Expand All @@ -225,7 +225,7 @@ public function tune(
->map(fn() => new self(
$this->protocol,
$this->heartbeat->adjust($heartbeat),
$this->socket->timeoutAfter($heartbeat->asPeriod()),
$this->socket->timeoutAfter($heartbeat),
$maxChannels,
$maxFrameSize,
$this->frame,
Expand Down
3 changes: 1 addition & 2 deletions src/Transport/Connection/Handshake.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ private function maybeTune(Connection $connection, Frame $frame): Either
->get(2)
->keep(Instance::of(Value\UnsignedShortInteger::class))
->map(static fn($value) => $value->original())
->map(Period::millisecond(...))
->map(static fn($period) => $period->asElapsedPeriod());
->map(Period::millisecond(...));

return Maybe::all($maxChannels, $maxFrameSize, $heartbeat)
->flatMap($connection->tune(...))
Expand Down
12 changes: 6 additions & 6 deletions src/Transport/Connection/Heartbeat.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Innmind\TimeContinuum\{
Clock,
PointInTime,
ElapsedPeriod,
Period,
};
use Innmind\Immutable\Sequence;

Expand All @@ -19,20 +19,20 @@
final class Heartbeat
{
private Clock $clock;
private ElapsedPeriod $threshold;
private Period $threshold;
private PointInTime $lastReceivedData;

private function __construct(
Clock $clock,
ElapsedPeriod $threshold,
Period $threshold,
PointInTime $lastReceivedData,
) {
$this->clock = $clock;
$this->threshold = $threshold;
$this->lastReceivedData = $lastReceivedData;
}

public static function start(Clock $clock, ElapsedPeriod $threshold): self
public static function start(Clock $clock, Period $threshold): self
{
return new self($clock, $threshold, $clock->now());
}
Expand All @@ -47,7 +47,7 @@ public function frames(): Sequence
->clock
->now()
->elapsedSince($this->lastReceivedData)
->longerThan($this->threshold)
->longerThan($this->threshold->asElapsedPeriod())
) {
$this->lastReceivedData = $this->clock->now();

Expand All @@ -62,7 +62,7 @@ public function active(): void
$this->lastReceivedData = $this->clock->now();
}

public function adjust(ElapsedPeriod $threshold): self
public function adjust(Period $threshold): self
{
return new self(
$this->clock,
Expand Down
2 changes: 1 addition & 1 deletion src/Transport/Connection/MessageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private function addProperties(
->asPredicate(),
)
->map(Period::millisecond(...))
->map(static fn($expiration) => $message->withExpiration($expiration->asElapsedPeriod())),
->map(static fn($expiration) => $message->withExpiration($expiration)),
],
[
7,
Expand Down
8 changes: 2 additions & 6 deletions src/Transport/Protocol/Basic.php
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,8 @@ private function serializeProperties(Message $message): array
static fn($expiration) => [
$flagBits | (1 << 8),
($properties)(ShortString::of(Str::of((string) (
$expiration
->asPeriod()
->milliseconds() +
$expiration
->asPeriod()
->seconds() * 1000
$expiration->milliseconds() +
$expiration->seconds() * 1000
)))),
],
static fn() => [$flagBits, $properties],
Expand Down
1 change: 0 additions & 1 deletion src/Transport/Protocol/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public function tuneOk(TuneOk $command): Sequence
UnsignedShortInteger::of(
$command
->heartbeat()
->asPeriod()
->seconds(),
),
));
Expand Down
6 changes: 3 additions & 3 deletions tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function setUp(): void
$this->client = Factory::of($this->os)->make(
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
Period::second(1)->asElapsedPeriod(),
Period::second(1),
);
}

Expand Down Expand Up @@ -284,7 +284,7 @@ public function testGetMessageWithAllProperties()
->withPriority(Message\Priority::five)
->withCorrelationId(Message\CorrelationId::of('correlation'))
->withReplyTo(Message\ReplyTo::of('reply'))
->withExpiration(Period::second(10)->asElapsedPeriod())
->withExpiration(Period::second(10))
->withId(Message\Id::of('id'))
->withTimestamp($now = $this->os->clock()->now())
->withType(Message\Type::of('type'))
Expand Down Expand Up @@ -416,7 +416,7 @@ public function testGetMessageWithAllProperties()
static fn() => null,
));
$this->assertSame(10000, $message->expiration()->match(
static fn($value) => $value->asPeriod()->seconds() * 1000,
static fn($value) => $value->seconds() * 1000,
static fn() => null,
));
$this->assertSame('id', $message->id()->match(
Expand Down
2 changes: 1 addition & 1 deletion tests/Model/Basic/MessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public function testExpiration()
{
$message = Message::of(Str::of(''));
$message2 = $message->withExpiration(
$expected = Period::second(1)->asElapsedPeriod(),
$expected = Period::second(1),
);

$this->assertInstanceOf(Message::class, $message2);
Expand Down
2 changes: 1 addition & 1 deletion tests/Model/Connection/TuneOkTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function testInterface()
$command = TuneOk::of(
MaxChannels::of(1),
MaxFrameSize::of(10),
$heartbeat = Period::second(1)->asElapsedPeriod(),
$heartbeat = Period::second(1),
);

$this->assertSame(1, $command->maxChannels());
Expand Down
2 changes: 1 addition & 1 deletion tests/Transport/Connection/FrameReaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public function testReadHeader()
->withPriority(Priority::five)
->withCorrelationId(CorrelationId::of('correlation'))
->withReplyTo(ReplyTo::of('reply'))
->withExpiration(Period::second(1)->asElapsedPeriod())
->withExpiration(Period::second(1))
->withId(Id::of('id'))
->withTimestamp($now = PointInTime::now())
->withType(MessageType::of('type'))
Expand Down
8 changes: 4 additions & 4 deletions tests/Transport/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function testInterface()
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
$protocol = new Protocol($os->clock(), new ArgumentTranslator),
Period::second(1)->asElapsedPeriod(),
Period::second(1),
$os->clock(),
$os->remote(),
$os->sockets(),
Expand Down Expand Up @@ -76,7 +76,7 @@ public function testClose()
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
$protocol = new Protocol($os->clock(), new ArgumentTranslator),
Period::second(1)->asElapsedPeriod(),
Period::second(1),
$os->clock(),
$os->remote(),
$os->sockets(),
Expand All @@ -100,7 +100,7 @@ public function testReturnFailureWhenReceivedFrameIsNotTheExpectedOne()
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
new Protocol($os->clock(), new ArgumentTranslator),
Period::second(1)->asElapsedPeriod(),
Period::second(1),
$os->clock(),
$os->remote(),
$os->sockets(),
Expand Down Expand Up @@ -132,7 +132,7 @@ public function testReturnFailureWhenConnectionClosedByServer()
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
$protocol = new Protocol($os->clock(), new ArgumentTranslator),
Period::second(1)->asElapsedPeriod(),
Period::second(1),
$os->clock(),
$os->remote(),
$os->sockets(),
Expand Down
2 changes: 1 addition & 1 deletion tests/Transport/Protocol/BasicTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public function testPublishWithProperties()
->withPriority(Priority::five)
->withCorrelationId(CorrelationId::of('correlation'))
->withReplyTo(ReplyTo::of('reply'))
->withExpiration(Period::second(1)->asElapsedPeriod())
->withExpiration(Period::second(1))
->withId(Id::of('id'))
->withTimestamp($now = PointInTime::now())
->withType(MessageType::of('type'))
Expand Down
2 changes: 1 addition & 1 deletion tests/Transport/Protocol/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public function testTuneOk()
TuneOk::of(
MaxChannels::of(1),
MaxFrameSize::of(10),
Period::second(3)->asElapsedPeriod(),
Period::second(3),
),
)->match(
static fn($frame) => $frame,
Expand Down
2 changes: 1 addition & 1 deletion tests/Transport/ProtocolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public function testReadHeader()
->withPriority(Priority::five)
->withCorrelationId(CorrelationId::of('correlation'))
->withReplyTo(ReplyTo::of('reply'))
->withExpiration(Period::second(1)->asElapsedPeriod())
->withExpiration(Period::second(1))
->withId(Id::of('id'))
->withTimestamp($now = PointInTime::now())
->withType(Type::of('type'))
Expand Down