Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,41 @@ jobs:
outputs:
version: ${{ steps.supported-versions-matrix.outputs.version }}
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v4
- id: supported-versions-matrix
uses: WyriHaximus/github-action-composer-php-versions-in-range@v1
tests:
services:
postgres:
image: postgres:${{ matrix.postgres }}
env:
POSTGRES_PASSWORD: postgres
POSTGRES_INITDB_ARGS: --auth-host=md5
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
name: Testing on PHP ${{ matrix.php }} with ${{ matrix.composer }} dependency preference against Postgres ${{ matrix.postgres }}
name: Testing on PHP ${{ matrix.php }} with ${{ matrix.composer }} dependency preference against Postgres ${{ matrix.postgres }} with TLS ${{ matrix.tls }}
strategy:
fail-fast: false
matrix:
php: ${{ fromJson(needs.supported-versions-matrix.outputs.version) }}
postgres: [12, 13, 14, 15]
postgres: [16, 17]
composer: [lowest, locked, highest]
tls: ["disable", "require", "verify-ca"]
needs:
- supported-versions-matrix
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: ikalnytskyi/action-setup-postgres@v7
with:
postgres-version: ${{ matrix.postgres }}
ssl: ${{ matrix.tls == 'disable' && false || true }}
id: postgres
- run: |
psql -c "CREATE USER pgasync"
psql -c "ALTER ROLE pgasync PASSWORD 'pgasync'"
psql -c "CREATE USER pgasyncpw"
psql -c "ALTER ROLE pgasyncpw PASSWORD 'example_password'"
psql -c "CREATE USER scram_user"
psql -c "SET password_encryption='scram-sha-256';ALTER ROLE scram_user PASSWORD 'scram_password'"
psql -c "CREATE DATABASE pgasync_test OWNER pgasync"
# cat tests/test_db.sql | xargs -I % psql -c "%"
env:
PGSERVICE: "${{ steps.postgres.outputs.service-name }}"
- run: |
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER pgasync"
PGPASSWORD=postgres psql -h localhost -U postgres -c "ALTER ROLE pgasync PASSWORD 'pgasync'"
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER pgasyncpw"
PGPASSWORD=postgres psql -h localhost -U postgres -c "ALTER ROLE pgasyncpw PASSWORD 'example_password'"
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER scram_user"
PGPASSWORD=postgres psql -h localhost -U postgres -c "SET password_encryption='scram-sha-256';ALTER ROLE scram_user PASSWORD 'scram_password'"
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE DATABASE pgasync_test OWNER pgasync"
PGPASSWORD=pgasync psql -h localhost -U pgasync -f tests/test_db.sql pgasync_test
# PGPASSWORD=postgres cat tests/test_db.sql | xargs -I % psql -h localhost -U postgres -c "%"
- uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
Expand All @@ -60,4 +55,8 @@ jobs:
with:
dependency-versions: ${{ matrix.composer }}
# - run: vendor/bin/phpunit --testdox
- run: echo "dsn=postgresql://pgasync:pgasync@localhost/pgasync_test?tlsmode=${{ matrix.tls }}&tlsservercert=${{ steps.postgres.outputs.certificate-path }}" >> $GITHUB_OUTPUT
id: dsn
- run: vendor/bin/phpunit
env:
TEST_POSTGRES_DSN: ${{ steps.dsn.outputs.dsn }}
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,35 @@ $client->listen('some_channel')
$client->query("NOTIFY some_channel, 'Hello World'")->subscribe();
```

## Example - Connecting over TLS with CA certificate file
```php

$client = new PgAsync\Client([
"host" => "127.0.0.1",
"port" => "5432",
"user" => "matt",
"database" => "matt",
"tls" => "verify-full",
"tls_connector_flags" => [
"cafile" => "/path/to/ca.crt",
],
]);

$client->query('SELECT * FROM channel')->subscribe(
function ($row) {
var_dump($row);
},
function ($e) {
echo "Failed.\n";
},
function () {
echo "Complete.\n";
}
);


```

## Install
With [composer](https://getcomposer.org/) install into you project with:

Expand Down
11 changes: 6 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
},
"require": {
"php": ">=7.0.0",
"voryx/event-loop": "^3.0 || ^2.0.2",
"reactivex/rxphp": "^2.0",
"react/socket": "^1.0 || ^0.8 || ^0.7",
"evenement/evenement": "^2.0 | ^3.0"
"voryx/event-loop": "^3.0.2 || ^2.0.2",
"reactivex/rxphp": "^2.0.11",
"react/promise-stream": "^1.5",
"evenement/evenement": "^2.0 | ^3.0.2",
"wyrihaximus/react-opportunistic-tls": "^1.0.0"
},
"require-dev": {
"phpunit/phpunit": ">=8.5.23 || ^6.5.5",
"react/dns": "^1.0"
"react/dns": "^1.12.0"
},
"scripts": {
"docker-up": "cd docker && docker-compose up -d",
Expand Down
103 changes: 82 additions & 21 deletions src/PgAsync/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use PgAsync\Command\PasswordMessage;
use PgAsync\Command\SaslInitialResponse;
use PgAsync\Command\SaslResponse;
use PgAsync\Command\SSLRequest;
use PgAsync\Command\Sync;
use PgAsync\Command\Terminate;
use PgAsync\Message\Authentication;
Expand All @@ -32,9 +33,13 @@
use PgAsync\Message\ReadyForQuery;
use PgAsync\Message\RowDescription;
use PgAsync\Command\StartupMessage;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Socket\Connector;
use React\Promise\Promise;
use React\Socket\ConnectionInterface;
use WyriHaximus\React\Socket\Connector;
use React\Socket\ConnectorInterface;
use WyriHaximus\React\Socket\OpportunisticTlsConnectionInterface;
use React\Stream\DuplexStreamInterface;
use Rx\Disposable\CallbackDisposable;
use Rx\Disposable\EmptyDisposable;
Expand All @@ -43,6 +48,8 @@
use Rx\ObserverInterface;
use Rx\SchedulerInterface;
use Rx\Subject\Subject;
use function React\Promise\resolve;
use function React\Promise\Stream\first;

class Connection extends EventEmitter
{
Expand Down Expand Up @@ -73,6 +80,16 @@ class Connection extends EventEmitter
const CONNECTION_NEEDED = 8; /* Internal state: connect() needed */
const CONNECTION_CLOSED = 9;

// Reference table: https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION
const TLS_MODE_DISABLE = 'disable';
const TLS_MODE_ALLOW = 'allow';
const TLS_MODE_PREFER = 'prefer';
const TLS_MODE_REQUIRE = 'require';
const TLS_MODE_VERIFY_CA = 'verify-ca';
const TLS_MODE_VERIFY_FULL = 'verify-full';
const TLS_MODE_LIST_FULL = [self::TLS_MODE_DISABLE, self::TLS_MODE_ALLOW, self::TLS_MODE_PREFER, self::TLS_MODE_REQUIRE, self::TLS_MODE_VERIFY_CA, self::TLS_MODE_VERIFY_FULL];
const TLS_MODE_LIST_REQUIRED = [self::TLS_MODE_REQUIRE, self::TLS_MODE_VERIFY_CA, self::TLS_MODE_VERIFY_FULL];

private $queryState;
private $queryType;
private $connStatus;
Expand Down Expand Up @@ -134,6 +151,8 @@ class Connection extends EventEmitter

/** @var bool */
private $auto_disconnect = false;
private $tls = self::TLS_MODE_PREFER;
private $tlsConnectorFlags = [];
private $password;

public function __construct(array $parameters, LoopInterface $loop, ConnectorInterface $connector = null)
Expand All @@ -158,6 +177,19 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
unset($parameters['password']);
}

if (array_key_exists('tls', $parameters)) {
if (!in_array($this->tls, self::TLS_MODE_LIST_FULL)) {
throw new \InvalidArgumentException('TLS mode must be one off "' . implode(', ', self::TLS_MODE_LIST_FULL) . ' but got "' . $parameters['tls'] . '" instead');
}
$this->tls = $parameters['tls'];
unset($parameters['tls']);
}

if (array_key_exists('tls_connector_flags', $parameters)) {
$this->tlsConnectorFlags = $parameters['tls_connector_flags'];
unset($parameters['tls_connector_flags']);
}

if (isset($parameters['auto_disconnect'])) {
$this->auto_disconnect = $parameters['auto_disconnect'];
unset($parameters['auto_disconnect']);
Expand All @@ -172,8 +204,17 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
$this->queryState = static::STATE_BUSY;
$this->queryType = static::QUERY_SIMPLE;
$this->connStatus = static::CONNECTION_NEEDED;
$this->socket = $connector ?: new Connector($loop);
$this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port'];
$this->socket = $connector ?: new Connector($loop, [
'tls' => [
'verify_peer' => $this->tls === self::TLS_MODE_VERIFY_FULL,
'verify_peer_name' => $this->tls === self::TLS_MODE_VERIFY_FULL,
'allow_self_signed' => $this->tls !== self::TLS_MODE_VERIFY_FULL,
] + $this->tlsConnectorFlags,
]);
// We always url `opportunistic+tls` as scheme because the logic required for using `tcp` on TLS `disable`
// mode is more complex than worth it when connecting to the server. And the `SecureConnector` gives us a
// plaint text connection with all TLS flags already set and ready to use for all the other modes.
$this->uri = 'opportunistic+tls://' . $parameters['host'] . ':' . $parameters['port'];
$this->notificationSubject = new Subject();
$this->cancelPending = false;
$this->cancelRequested = false;
Expand All @@ -191,23 +232,43 @@ private function start()
$this->connStatus = static::CONNECTION_STARTED;

$this->socket->connect($this->uri)->then(
function (DuplexStreamInterface $stream) {
$this->stream = $stream;
$this->connStatus = static::CONNECTION_MADE;

$stream->on('close', [$this, 'onClose']);
function (OpportunisticTlsConnectionInterface $stream) {
(new Promise(function (callable $resolve, callable $reject) use ($stream) {
if ($this->tls !== self::TLS_MODE_DISABLE) {
first($stream)->then(function ($data) use ($resolve, $reject, $stream) {
if (trim($data) === 'S') {
$stream->enableEncryption()->then($resolve, $reject);
return;
}

if (in_array($this->tls, self::TLS_MODE_LIST_REQUIRED)) {
$reject(new \RuntimeException('Failed to encrypt connection while required'));
return;
}

$resolve($stream);
}, $reject);

$ssl = new SSLRequest();
$stream->write($ssl->encodedMessage());
return;
}

$stream->on('data', [$this, 'onData']);
$resolve($stream);
}))->then(function (DuplexStreamInterface $stream) {
$this->stream = $stream;
$this->connStatus = static::CONNECTION_MADE;

// $ssl = new SSLRequest();
// $stream->write($ssl->encodedMessage());
$stream->on('close', [$this, 'onClose']);
$stream->on('data', [$this, 'onData']);

$startupParameters = $this->parameters;
unset($startupParameters['host'], $startupParameters['port']);
$startupParameters = $this->parameters;
unset($startupParameters['host'], $startupParameters['port']);

$startup = new StartupMessage();
$startup->setParameters($startupParameters);
$stream->write($startup->encodedMessage());
$startup = new StartupMessage();
$startup->setParameters($startupParameters);
$stream->write($startup->encodedMessage());
});
},
function ($e) {
// connection error
Expand Down Expand Up @@ -596,11 +657,11 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use
$this->processQueue();

return new CallbackDisposable(function () use ($q) {
if ($this->currentCommand === $q && $q->isActive()) {
$this->cancelRequested = true;
}
$q->cancel();
});
if ($this->currentCommand === $q && $q->isActive()) {
$this->cancelRequested = true;
}
$q->cancel();
});
}
);

Expand Down
4 changes: 2 additions & 2 deletions tests/Integration/BoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class BoolTest extends TestCase
public function testBools()
{

$client = new Client(["user" => $this::getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()]);
$client = self::clientFromEnv(["user" => $this::getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()]);

$count = $client->query("SELECT * FROM thing");

Expand Down Expand Up @@ -57,7 +57,7 @@ function () use (&$completes, $client) {
*/
public function testBoolParam()
{
$client = new Client(["user" => $this::getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()]);
$client = self::clientFromEnv(["user" => $this::getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()]);

$args = [false, 1];

Expand Down
10 changes: 5 additions & 5 deletions tests/Integration/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ClientTest extends TestCase
{
public function testClientReusesIdleConnection()
{
$client = new Client(["user" => $this->getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()], $this->getLoop());
$client = self::clientFromEnv(["user" => $this->getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()], $this->getLoop());

$hello = null;

Expand Down Expand Up @@ -79,7 +79,7 @@ function () {

public function testAutoDisconnect()
{
$client = new Client([
$client = self::clientFromEnv([
"user" => $this->getDbUser(),
"password" => $this::getDbUser(),
"database" => $this::getDbName(),
Expand Down Expand Up @@ -114,7 +114,7 @@ function () {

public function testSendingTwoQueriesRepeatedlyOnlyCreatesTwoConnections()
{
$client = new Client([
$client = self::clientFromEnv([
"user" => $this->getDbUser(),
"password" => $this::getDbUser(),
"database" => $this::getDbName(),
Expand Down Expand Up @@ -156,7 +156,7 @@ function () {

public function testMaxConnections()
{
$client = new Client([
$client = self::clientFromEnv([
"user" => $this->getDbUser(),
"password" => $this::getDbUser(),
"database" => $this::getDbName(),
Expand Down Expand Up @@ -197,7 +197,7 @@ function () {

public function testListen()
{
$client = new Client([
$client = self::clientFromEnv([
"user" => $this->getDbUser(),
"password" => $this::getDbUser(),
"database" => $this::getDbName(),
Expand Down
2 changes: 1 addition & 1 deletion tests/Integration/Md5PasswordTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Md5PasswordTest extends TestCase
{
public function testMd5Login()
{
$client = new Client([
$client = self::clientFromEnv([
"user" => "pgasyncpw",
"database" => $this->getDbName(),
"auto_disconnect" => true,
Expand Down
2 changes: 1 addition & 1 deletion tests/Integration/NullPasswordTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public function testNullPassword()
{
$this->markTestSkipped('Not using null password anymore. Maybe should setup tests to twst this again.');

$client = new Client([
$client = self::clientFromEnv([
"user" => $this::getDbUser(),
"database" => $this::getDbName(),
"password" => null
Expand Down
2 changes: 1 addition & 1 deletion tests/Integration/ScramSha256PasswordTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ScramSha256PasswordTest extends TestCase
{
public function testScramSha256Login()
{
$client = new Client([
$client = self::clientFromEnv([
"user" => 'scram_user',
"database" => $this->getDbName(),
"auto_disconnect" => true,
Expand Down
Loading
Loading