Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
6fa34c2
Initial impl
anarthal Dec 16, 2025
cef6125
pubsub_state impl
anarthal Dec 16, 2025
67cb8e0
Initial request impl
anarthal Dec 19, 2025
f1e9772
start_command
anarthal Dec 19, 2025
278a99f
parsing workaround
anarthal Dec 19, 2025
e502703
clear and append
anarthal Dec 19, 2025
e820b45
some namespace changes
anarthal Dec 19, 2025
1a0d94c
namespace moving
anarthal Dec 19, 2025
7e9ab02
hide constructor
anarthal Dec 19, 2025
3a31d23
hide parse_last_argument
anarthal Dec 19, 2025
ddea2a9
Make the setting global
anarthal Dec 19, 2025
0c79084
clean multiplexer up
anarthal Dec 19, 2025
f611ced
Update the example
anarthal Dec 19, 2025
14af2c9
Use the new extension point in json
anarthal Dec 19, 2025
9ed3825
Restore add_bulk
anarthal Dec 19, 2025
45f76e7
Merge branch 'develop' into feature/pubsub-state-restoration
anarthal Jan 7, 2026
706a1cd
Revert customization point
anarthal Jan 7, 2026
3bb25f5
initial (pun)subscribe functions
anarthal Jan 7, 2026
c325e58
Proper implementation of pubsub methods
anarthal Jan 7, 2026
5ef0883
Remove config::restore_pubsub_state
anarthal Jan 7, 2026
fa5a74f
cleanup
anarthal Jan 7, 2026
a37b5c2
Rename to subscription_tracker
anarthal Jan 7, 2026
7e35895
build errors
anarthal Jan 7, 2026
e12427d
Update example
anarthal Jan 7, 2026
c7aa9fe
Update discussion
anarthal Jan 7, 2026
5e51e51
Update the chat room example
anarthal Jan 7, 2026
fc14d1a
rename file
anarthal Jan 7, 2026
4a222ce
fix command typo
anarthal Jan 7, 2026
b08f2c8
Integ test
anarthal Jan 7, 2026
89c516a
verify subscriptions
anarthal Jan 8, 2026
941d633
Use restoring pubsub in some tests
anarthal Jan 8, 2026
8e53088
update request example
anarthal Jan 8, 2026
6fc65f5
remove change type none
anarthal Jan 8, 2026
c8ee2c1
first request test
anarthal Jan 8, 2026
c5f31d1
Implement request::append
anarthal Jan 8, 2026
6ea8edb
push_range oubsub
anarthal Jan 8, 2026
e621d4a
append existing tests
anarthal Jan 8, 2026
83356c8
append tests
anarthal Jan 8, 2026
ab35372
subscribe tests
anarthal Jan 8, 2026
85bf939
reduce duplication
anarthal Jan 8, 2026
8ef4dc8
unsubscribe tests
anarthal Jan 8, 2026
1a3c7d7
reduce duplication 2
anarthal Jan 8, 2026
2c0aba0
psubscribe
anarthal Jan 8, 2026
738283d
punsubscribe
anarthal Jan 8, 2026
10050ad
clear tests
anarthal Jan 8, 2026
92980bd
mix pubsub and regular
anarthal Jan 8, 2026
0edc134
tracker 1st test
anarthal Jan 8, 2026
adf8a65
psubscribe
anarthal Jan 8, 2026
7ec85cc
subs psubs same arg
anarthal Jan 8, 2026
278c96b
more tests
anarthal Jan 8, 2026
4d0eeea
finished tracker test
anarthal Jan 8, 2026
c57531c
Remove commit_change
anarthal Jan 8, 2026
16ddd82
rename
anarthal Jan 8, 2026
42b390f
Fix test_exec_fsm
anarthal Jan 8, 2026
55bb0d7
rename tracker
anarthal Jan 8, 2026
1db2055
exec success tracking
anarthal Jan 8, 2026
1a3e7cf
exec tracking err
anarthal Jan 8, 2026
e801744
fix test compose_setup_request
anarthal Jan 8, 2026
ded328d
reduce duplication
anarthal Jan 8, 2026
ba3f329
verify leftover cleanup
anarthal Jan 8, 2026
37ca966
new tests
anarthal Jan 8, 2026
66a4859
static_assert
anarthal Jan 8, 2026
30da6c5
Update reference docs
anarthal Jan 8, 2026
3b9b7d2
fix test_setup_adapter
anarthal Jan 8, 2026
4ae2a43
Fix run_fsm test
anarthal Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,32 +93,39 @@ The coroutine below shows how to use it


```cpp
auto
receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");

flat_tree resp;
generic_flat_response resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
while (conn->will_reconnect()) {

// Reconnect to channels.
co_await conn->async_exec(req);
// Subscribe to the channel 'mychannel'. You can add any number of channels here.
request req;
req.subscribe({"mychannel"});
co_await conn->async_exec(req);

// You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored
// in resp. If the connection encounters a network error and reconnects to the server,
// it will automatically subscribe to 'mychannel' again. This is transparent to the user.

// Loop to read Redis push messages.
for (error_code ec;;) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));

// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
break;
}

// Loop reading Redis pushes.
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem : resp.value().get_view())
std::cout << elem.value << "\n";

// Use the response resp in some way and then clear it.
...
std::cout << std::endl;

resp.clear();
}
resp.value().clear();
}
}
```
Expand Down
47 changes: 27 additions & 20 deletions doc/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,32 +104,39 @@ The coroutine below shows how to use it

[source,cpp]
----
auto
receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");

flat_tree resp;
generic_flat_response resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
while (conn->will_reconnect()) {

// Reconnect to channels.
co_await conn->async_exec(req);
// Subscribe to the channel 'mychannel'. You can add any number of channels here.
request req;
req.subscribe({"mychannel"});
co_await conn->async_exec(req);

// You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored
// in resp. If the connection encounters a network error and reconnects to the server,
// it will automatically subscribe to 'mychannel' again. This is transparent to the user.

// Loop to read Redis push messages.
for (error_code ec;;) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));

// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
break;
}

// Loop reading Redis pushes.
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem : resp.value().get_view())
std::cout << elem.value << "\n";

// Use the response here and then clear it.
...
std::cout << std::endl;

resp.clear();
}
resp.value().clear();
}
}
----
Expand Down
2 changes: 1 addition & 1 deletion doc/modules/ROOT/pages/requests_responses.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ must **NOT** be included in the response tuple. For example, the following reque
----
request req;
req.push("PING");
req.push("SUBSCRIBE", "channel");
req.subscribe({"channel"});
req.push("QUIT");
----

Expand Down
52 changes: 34 additions & 18 deletions example/cpp20_chat_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>

#include <exception>
#include <iostream>
#include <unistd.h>

Expand All @@ -40,31 +41,44 @@ using namespace std::chrono_literals;
// Chat over Redis pubsub. To test, run this program from multiple
// terminals and type messages to stdin.

namespace {

auto rethrow_on_error = [](std::exception_ptr exc) {
if (exc)
std::rethrow_exception(exc);
};

auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");

// Set the receive response, so pushes are stored in resp
generic_flat_response resp;
conn->set_receive_response(resp);

while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req);
// Subscribe to the channel 'channel'. Using request::subscribe()
// (instead of request::push()) makes the connection re-subscribe
// to 'channel' whenever it re-connects to the server.
request req;
req.subscribe({"channel"});
co_await conn->async_exec(req);

// Loop reading Redis push messages.
for (error_code ec;;) {
co_await conn->async_receive2(redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
for (error_code ec;;) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));

for (auto const& elem: resp.value().get_view())
std::cout << elem.value << "\n";
// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
break;
}

std::cout << std::endl;
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem : resp.value().get_view())
std::cout << elem.value << "\n";

resp.value().clear();
}
std::cout << std::endl;

resp.value().clear();
}
}

Expand All @@ -81,15 +95,17 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
}
}

} // namespace

// Called from the main function (see main.cpp)
auto co_main(config cfg) -> awaitable<void>
{
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto stream = std::make_shared<stream_descriptor>(ex, ::dup(STDIN_FILENO));

co_spawn(ex, receiver(conn), detached);
co_spawn(ex, publisher(stream, conn), detached);
co_spawn(ex, receiver(conn), rethrow_on_error);
co_spawn(ex, publisher(stream, conn), rethrow_on_error);
conn->async_run(cfg, consign(detached, conn));

signal_set sig_set{ex, SIGINT, SIGTERM};
Expand Down
51 changes: 29 additions & 22 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/use_awaitable.hpp>
Expand All @@ -32,7 +33,7 @@ using asio::signal_set;
* To test send messages with redis-cli
*
* $ redis-cli -3
* 127.0.0.1:6379> PUBLISH channel some-message
* 127.0.0.1:6379> PUBLISH mychannel some-message
* (integer) 3
* 127.0.0.1:6379>
*
Expand All @@ -46,33 +47,39 @@ using asio::signal_set;
// Receives server pushes.
auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");

generic_flat_response resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
while (conn->will_reconnect()) {
// Reconnect to the channels.
co_await conn->async_exec(req);

// Loop to read Redis push messages.
for (error_code ec;;) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// Subscribe to the channel 'mychannel'. You can add any number of channels here.
request req;
req.subscribe({"mychannel"});
co_await conn->async_exec(req);

// You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored
// in resp. If the connection encounters a network error and reconnects to the server,
// it will automatically subscribe to 'mychannel' again. This is transparent to the user.
// You need to use specialized request::subscribe() function (instead of request::push)
// to enable this behavior.

// Loop to read Redis push messages.
for (error_code ec;;) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));

// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
break;
}

// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem: resp.value().get_view())
std::cout << elem.value << "\n";
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem : resp.value().get_view())
std::cout << elem.value << "\n";

std::cout << std::endl;
std::cout << std::endl;

resp.value().clear();
}
resp.value().clear();
}
}

Expand Down
7 changes: 5 additions & 2 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ struct connection_impl {
{
while (true) {
// Invoke the state machine
auto act = fsm_.resume(obj_->is_open(), self.get_cancellation_state().cancelled());
auto act = fsm_.resume(
obj_->is_open(),
obj_->st_,
self.get_cancellation_state().cancelled());

// Do what the FSM said
switch (act.type()) {
Expand Down Expand Up @@ -203,7 +206,7 @@ struct connection_impl {
});

return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
exec_op{this, notifier, exec_fsm(st_.mpx, std::move(info))},
exec_op{this, notifier, exec_fsm(std::move(info))},
token,
writer_cv_);
}
Expand Down
3 changes: 3 additions & 0 deletions include/boost/redis/detail/connection_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <boost/redis/config.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/subscription_tracker.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/node.hpp>
Expand Down Expand Up @@ -47,7 +48,9 @@ struct connection_state {
config cfg{};
multiplexer mpx{};
std::string diagnostic{}; // Used by the setup request and Sentinel
request setup_req{};
request ping_req{};
subscription_tracker tracker{};

// Sentinel stuff
lazy_random_engine eng{};
Expand Down
13 changes: 8 additions & 5 deletions include/boost/redis/detail/exec_fsm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace boost::redis::detail {

struct connection_state;

// What should we do next?
enum class exec_action_type
{
Expand Down Expand Up @@ -54,16 +56,17 @@ class exec_action {

class exec_fsm {
int resume_point_{0};
multiplexer* mpx_{nullptr};
std::shared_ptr<multiplexer::elem> elem_;

public:
exec_fsm(multiplexer& mpx, std::shared_ptr<multiplexer::elem> elem) noexcept
: mpx_(&mpx)
, elem_(std::move(elem))
exec_fsm(std::shared_ptr<multiplexer::elem> elem) noexcept
: elem_(std::move(elem))
{ }

exec_action resume(bool connection_is_open, asio::cancellation_type_t cancel_state);
exec_action resume(
bool connection_is_open,
connection_state& st,
asio::cancellation_type_t cancel_state);
};

} // namespace boost::redis::detail
Expand Down
Loading