Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ if (BOOST_REDIS_MAIN_PROJECT)
test
json
endian
compat
)

foreach(dep IN LISTS deps)
Expand Down
16 changes: 9 additions & 7 deletions include/boost/redis/adapter/detail/adapters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

#include <boost/redis/adapter/result.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/redis/response.hpp>

#include <boost/assert.hpp>
Expand Down Expand Up @@ -216,7 +216,12 @@ class general_aggregate<generic_flat_response> {
: tree_(c)
{ }

void on_init() { }
void on_init()
{
if (tree_->has_value()) {
tree_->value().notify_init();
}
}
void on_done()
{
BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer");
Expand Down Expand Up @@ -255,11 +260,8 @@ class general_aggregate<resp3::flat_tree> {
: tree_(c)
{ }

void on_init() { }
void on_done()
{
tree_->notify_done();
}
void on_init() { tree_->notify_init(); }
void on_done() { tree_->notify_done(); }

template <class String>
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
Expand Down
59 changes: 56 additions & 3 deletions include/boost/redis/impl/flat_tree.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ inline void grow(flat_buffer& buff, std::size_t new_capacity, view_tree& nodes)
++buff.reallocs;
}

// Erases the first num_bytes bytes from the buffer by moving
// the remaining bytes forward. Rebases the strings in nodes as required.
inline void erase_first(flat_buffer& buff, std::size_t num_bytes, view_tree& nodes)
{
BOOST_ASSERT(num_bytes <= buff.size);
if (num_bytes > 0u) {
// If we have any data to move, we should always have a buffer
BOOST_ASSERT(buff.data.get() != nullptr);

// Record the old base
const char* old_base = buff.data.get() + num_bytes;

// Move all that we're gonna keep to the start of the buffer
auto bytes_left = buff.size - num_bytes;
std::memmove(buff.data.get(), old_base, bytes_left);

// Rebase strings
rebase_strings(nodes, old_base, buff.data.get());
}
}

// Appends a string to the buffer.
// Might rebase the string in nodes, but doesn't append any new node.
inline std::string_view append(flat_buffer& buff, std::string_view value, view_tree& nodes)
Expand All @@ -129,6 +150,8 @@ flat_tree::flat_tree(flat_tree const& other)
: data_{detail::copy_construct(other.data_)}
, view_tree_{other.view_tree_}
, total_msgs_{other.total_msgs_}
, node_tmp_offset_{other.node_tmp_offset_}
, data_tmp_offset_{other.data_tmp_offset_}
{
detail::rebase_strings(view_tree_, other.data_.data.get(), data_.data.get());
}
Expand All @@ -145,6 +168,8 @@ flat_tree& flat_tree::operator=(const flat_tree& other)

// Copy the other fields
total_msgs_ = other.total_msgs_;
node_tmp_offset_ = other.node_tmp_offset_;
data_tmp_offset_ = other.data_tmp_offset_;
}

return *this;
Expand All @@ -161,8 +186,15 @@ void flat_tree::reserve(std::size_t bytes, std::size_t nodes)

void flat_tree::clear() noexcept
{
data_.size = 0u;
view_tree_.clear();
// Discard everything except for the tmp area
view_tree_.erase(view_tree_.begin(), view_tree_.begin() + node_tmp_offset_);
node_tmp_offset_ = 0u;

// Do the same for the data area
detail::erase_first(data_, data_tmp_offset_, view_tree_);
data_tmp_offset_ = 0u;

// We now have no messages
total_msgs_ = 0u;
}

Expand All @@ -180,10 +212,31 @@ void flat_tree::push(node_view const& nd)
});
}

void flat_tree::notify_init()
{
// Discard any data in the tmp area, as it belongs to an operation that never finished
BOOST_ASSERT(node_tmp_offset_ <= view_tree_.size());
BOOST_ASSERT(data_tmp_offset_ <= data_.size);
view_tree_.resize(node_tmp_offset_);
data_.size = data_tmp_offset_;
}

void flat_tree::notify_done()
{
++total_msgs_;
node_tmp_offset_ = view_tree_.size();
data_tmp_offset_ = data_.size;
}

bool operator==(flat_tree const& a, flat_tree const& b)
{
// data is already taken into account by comparing the nodes.
return a.view_tree_ == b.view_tree_ && a.total_msgs_ == b.total_msgs_;
// Only committed nodes should be taken into account.
auto a_nodes = a.get_view();
auto b_nodes = b.get_view();
return a_nodes.size() == b_nodes.size() &&
std::equal(a_nodes.begin(), a_nodes.end(), b_nodes.begin()) &&
a.total_msgs_ == b.total_msgs_;
}

} // namespace boost::redis::resp3
25 changes: 22 additions & 3 deletions include/boost/redis/resp3/flat_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/tree.hpp>

#include <boost/core/span.hpp>

#include <cstddef>
#include <memory>

Expand Down Expand Up @@ -164,7 +166,16 @@ class flat_tree {
*
* @returns The number of bytes in use in the data buffer.
*/
auto data_size() const noexcept -> std::size_t { return data_.size; }
auto data_size() const noexcept -> std::size_t { return data_tmp_offset_; }

/** @brief Returns the capacity of the node container.
*
* @par Exception safety
* No-throw guarantee.
*
* @returns The capacity of the object, in number of nodes.
*/
auto capacity() const noexcept -> std::size_t { return view_tree_.capacity(); }

/** @brief Returns the capacity of the data buffer, in bytes.
*
Expand All @@ -187,7 +198,7 @@ class flat_tree {
*
* @returns The nodes in the tree.
*/
auto get_view() const noexcept -> view_tree const& { return view_tree_; }
span<const node_view> get_view() const noexcept { return {view_tree_.data(), node_tmp_offset_}; }

/** @brief Returns the number of memory reallocations that took place in the data buffer.
*
Expand Down Expand Up @@ -215,14 +226,22 @@ class flat_tree {
private:
template <class> friend class adapter::detail::general_aggregate;

void notify_done() { ++total_msgs_; }
void notify_init();
void notify_done();

// Push a new node to the response
void push(node_view const& node);

detail::flat_buffer data_;
view_tree view_tree_;
std::size_t total_msgs_ = 0u;

// flat_tree supports a "temporary working area" for incrementally reading messages.
// Nodes in the tmp area are not part of the object representation until they
// are committed with notify_done().
// These offsets delimit this area.
std::size_t node_tmp_offset_ = 0u;
std::size_t data_tmp_offset_ = 0u;
};

/**
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ make_test(test_multiplexer)
make_test(test_parse_sentinel_response)
make_test(test_update_sentinel_list)
make_test(test_flat_tree)
make_test(test_generic_flat_response)
make_test(test_read_buffer)

# Tests that require a real Redis server
Expand Down
1 change: 1 addition & 0 deletions test/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ local tests =
test_parse_sentinel_response
test_update_sentinel_list
test_flat_tree
test_generic_flat_response
test_read_buffer
;

Expand Down
28 changes: 28 additions & 0 deletions test/test_conn_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/connection.hpp>
#include <boost/redis/response.hpp>

#include <boost/asio/detached.hpp>

Expand Down Expand Up @@ -180,4 +181,31 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter)
BOOST_TEST(std::get<0>(res).value() == "PONG");
}

BOOST_AUTO_TEST_CASE(exec_generic_flat_response)
{
// Executing with a generic_flat_response works
request req;
req.push("PING", "PONG");
boost::redis::generic_flat_response resp;

net::io_context ioc;

auto conn = std::make_shared<connection>(ioc);

bool finished = false;

conn->async_exec(req, resp, [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->cancel();
finished = true;
});

run(conn);
ioc.run_for(test_timeout);
BOOST_TEST_REQUIRE(finished);

BOOST_TEST(resp.has_value());
BOOST_TEST(resp->get_view().front().value == "PONG");
}

} // namespace
Loading