From 1ceccd072092faec571d8a9914ca3312e72ffd29 Mon Sep 17 00:00:00 2001 From: Kurt Sassenrath Date: Mon, 6 Nov 2023 07:06:19 -0800 Subject: [PATCH] WIP: Add hydrogen, session_id --- WORKSPACE | 24 +++++ include/parselink/proto/session.h | 66 ++++++++++++++ include/parselink/proto/session_id.h | 49 +++++++++++ source/proto/BUILD | 1 + source/proto/session.cpp | 30 +++++++ source/server.cpp | 126 +++++++++++++-------------- 6 files changed, 233 insertions(+), 63 deletions(-) create mode 100644 include/parselink/proto/session_id.h diff --git a/WORKSPACE b/WORKSPACE index e88bf1c..66e1c28 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -1,6 +1,7 @@ workspace(name = "parselink") load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") +load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") #=============================================================================== # Imported Bazel modules @@ -76,6 +77,29 @@ cc_library( """ ) +#------------------------------------------------------------------------------- +# libhydrogen: A lightweight cryptography library +#------------------------------------------------------------------------------- +hydrogen_commit = "c382cbb" +hydrogen_base_url = \ + "https://github.com/jedisct1/libhydrogen" +hydrogen_branch = "master" + +git_repository( + name = "hydrogen", + remote = hydrogen_base_url, + commit = hydrogen_commit, + build_file_content = +""" +cc_library( + name = "hydrogen", + srcs = glob(["hydrogen.c", "impl/**/*.h"]), + hdrs = ["hydrogen.h"], + visibility = ["//visibility:public"], +) +""", +) + #------------------------------------------------------------------------------- # ut: Unit test framework. # TODO(kss): Only if tests are needed? diff --git a/include/parselink/proto/session.h b/include/parselink/proto/session.h index cc2af7d..3495f3c 100644 --- a/include/parselink/proto/session.h +++ b/include/parselink/proto/session.h @@ -19,12 +19,28 @@ #define session_07eae057feface79 #include "parselink/msgpack/token.h" +#include #include +#include #include #include #include +template <> +struct std::hash> { + constexpr static std::uint32_t seed_var = 0x811c9dc5; + constexpr static std::uint32_t factor = 0x01000193; + + constexpr auto operator()(std::span data) const noexcept { + std::uint32_t digest = seed_var * factor; + for (auto byte : data) { + digest = (digest ^ static_cast(byte)) * factor; + } + return digest >> 8; + } +}; + namespace parselink { namespace proto { @@ -40,6 +56,8 @@ enum class error { struct header_info { std::uint32_t message_size; // Size of the message, minus the header. std::uint32_t bytes_read; // How many bytes of the buffer were used. + std::uint32_t bytes_parsed; // How many bytes were parsed as part of the + // header. }; struct connect_info { @@ -48,6 +66,54 @@ struct connect_info { std::span session_id; }; +template +struct transparent_hash { + using is_transparent = void; + using type = T; + using hash_type = std::hash; + + template + [[nodiscard]] constexpr auto operator()(Arg&& arg) const { + return hash_type{}(std::forward(arg)); + } +}; + +class session { +public: + using close_handle = std::function; + session(std::string_view user_id, close_handle hdl) noexcept; + + ~session(); + + std::span id() const noexcept { return id_; } + + std::string_view user_id() const noexcept { return user_id_; } + + auto last_activity() const noexcept { return last_activity_; } + + void update_last_activity( + std::chrono::steady_clock::time_point = + std::chrono::steady_clock::now()) noexcept {} + + constexpr bool operator==(std::span id) const noexcept { + return std::ranges::equal(id, id_); + } + +private: + std::array id_; + std::string user_id_; + close_handle closer_; + std::chrono::steady_clock::time_point last_activity_; +}; + +template <> +struct transparent_hash + : transparent_hash> { + [[nodiscard]] auto operator()(session const& s) const { + return std::hash>{}(s.id()); + } +}; + // Parse the protocol header out of a buffer. tl::expected parse_header( std::span buffer) noexcept; diff --git a/include/parselink/proto/session_id.h b/include/parselink/proto/session_id.h new file mode 100644 index 0000000..c6e7828 --- /dev/null +++ b/include/parselink/proto/session_id.h @@ -0,0 +1,49 @@ +//----------------------------------------------------------------------------- +// ___ __ _ _ +// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __ +// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ / +// / ___/ (_| | | \__ \ __/ /__| | | | | < +// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ . +// +//----------------------------------------------------------------------------- +// Author: Kurt Sassenrath +// Module: proto +// +// Session ID. Used as a handle to access an existing user session, which can +// also be used to share parse data without any linking of users. +// +// Copyright (c) 2023 Kurt Sassenrath. +// +// License TBD. +//----------------------------------------------------------------------------- +#ifndef session_id_6598f9bae1cbb501 +#define session_id_6598f9bae1cbb501 + +#include +#include +#include + +namespace parselink { +namespace proto { + +struct session_id { + std::array bytes; + session_id() noexcept; + + [[nodiscard]] constexpr auto operator<=>( + session_id const& other) const noexcept { + return bytes <=> other.bytes; + } + + [[nodiscard]] constexpr auto operator<=>( + std::span other) const noexcept { + return std::lexicographical_compare_three_way(bytes.begin(), + bytes.end(), other.begin(), other.end(), + std::compare_three_way()); + } +}; + +} // namespace proto +} // namespace parselink + +#endif // session_id_6598f9bae1cbb501 diff --git a/source/proto/BUILD b/source/proto/BUILD index 41225b4..7301a79 100644 --- a/source/proto/BUILD +++ b/source/proto/BUILD @@ -9,6 +9,7 @@ cc_library( "//include/parselink:proto", "//include/parselink:msgpack", "//source/logging", + "@hydrogen", ], visibility = [ # TODO: Fix visibility diff --git a/source/proto/session.cpp b/source/proto/session.cpp index 9b20db8..8bdc97f 100644 --- a/source/proto/session.cpp +++ b/source/proto/session.cpp @@ -19,11 +19,28 @@ //----------------------------------------------------------------------------- #include "parselink/proto/session.h" +#include "hydrogen.h" #include "parselink/logging.h" #include "parselink/msgpack/token.h" #include +namespace { + +void ensure_initialized() { + static auto x [[maybe_unused]] = [] { return hydro_init(); }(); +} + +template +auto get_random_bytes() { + ensure_initialized(); + std::array out; + hydro_random_buf(out.data(), N); + return out; +} + +} // anonymous namespace + using namespace parselink; using namespace parselink::proto; @@ -169,3 +186,16 @@ tl::expected proto::parse_connect( return tl::make_unexpected(error::bad_data); } } + +session::session(std::string_view user_id, close_handle hdl) noexcept + : id_(get_random_bytes<32>()) + , user_id_(std::string{user_id}) + , closer_(std::move(hdl)) + , last_activity_(std::chrono::steady_clock::now()) { + logger.debug("New session with id {} created for {} {:x}", id_, user_id_, + transparent_hash{}(*this)); +} + +session::~session() { + if (closer_) closer_("Destroyed"); +} diff --git a/source/server.cpp b/source/server.cpp index c7fb087..193b186 100644 --- a/source/server.cpp +++ b/source/server.cpp @@ -36,7 +36,7 @@ #include #include #include -#include +#include #include @@ -154,13 +154,6 @@ constexpr auto no_ex_defer = net::as_tuple(deferred); class user_connection; -struct user_session { - std::weak_ptr conn; - std::string user_id; - std::array session_id; - std::chrono::system_clock::time_point expires_at; -}; - class monolithic_server : public server { public: monolithic_server(std::string_view address, std::uint16_t user_port, @@ -168,16 +161,16 @@ public: std::error_code run() noexcept override; - net::awaitable> create_session( - std::weak_ptr conn, + net::awaitable> create_session( + std::shared_ptr const& conn, proto::connect_info const& info); private: - friend user_session; - awaitable user_listen(); - std::map> active_sessions_; + std::unordered_set, + std::equal_to<>> + sessions_; net::io_context io_context_; net::io_context::strand session_strand_; @@ -192,7 +185,9 @@ public: : server_(server) , socket_(std::move(sock)) {} - ~user_connection() { + ~user_connection() { stop(); } + + void stop() { logger.debug("Connection to {} closed.", socket_.remote_endpoint()); boost::system::error_code ec; socket_.shutdown(net::ip::tcp::socket::shutdown_both, ec); @@ -207,81 +202,85 @@ public: detached); } - awaitable> - buffer_message(std::span buffer) noexcept { + awaitable> read_message( + std::vector& buffer) noexcept { + // Use a small buffer on the stack to read the initial header. + std::array hdrbuff; + auto [ec, n] = co_await socket_.async_read_some( + net::buffer(hdrbuff), no_ex_coro); + + if (ec) { + logger.error("Reading hdr from user socket failed: {}", ec); + co_return tl::make_unexpected(proto::error::system_error); + } + + logger.debug("Read {} bytes from client: {}", n, + std::span(hdrbuff.data(), n)); + + auto maybe_hdr = proto::parse_header(std::span(hdrbuff.data(), n)); + + if (!maybe_hdr) { + logger.error("Unable to parse header: {}", maybe_hdr.error()); + co_return tl::make_unexpected(maybe_hdr.error()); + } + + // TODO(ksassenrath): Replace with specialized allocator. + buffer.resize(maybe_hdr->message_size); + + // Copy remaining portion of message in initial read to the message + // buffer. + std::copy(std::next(hdrbuff.begin(), maybe_hdr->bytes_read), + std::next(hdrbuff.begin(), n), buffer.begin()); + + auto span = std::span( + buffer.begin() + n - maybe_hdr->bytes_read, buffer.end()); + + // Buffer remaining message. std::size_t amt = 0; - while (amt < buffer.size()) { - auto subsp = buffer.subspan(amt); + while (amt < span.size()) { + auto subsp = span.subspan(amt); auto [ec, n] = co_await socket_.async_read_some( net::buffer(subsp), no_ex_coro); logger.debug("Read {} bytes, total is now {}", n, amt + n); if (ec || n == 0) { - logger.error("Reading from user socket failed: {}", ec); - co_return tl::make_unexpected(ec); + logger.error("Reading msg from user socket failed: {}", ec); + co_return tl::make_unexpected(proto::error::system_error); } amt += n; } - co_return std::monostate{}; + + co_return tl::monostate{}; } awaitable await_connect() noexcept { - // Use a small buffer on the stack to read the initial header. - std::array buffer; - auto [ec, n] = co_await socket_.async_read_some( - net::buffer(buffer), no_ex_coro); + std::vector msgbuf; - if (ec) { - logger.error("Reading from user socket failed: {}", ec); + if (auto maybe_msg = co_await read_message(msgbuf); !maybe_msg) { + logger.debug("returning"); co_return; } - logger.debug("Read {} bytes from client: {}", n, - std::span(buffer.data(), n)); - - auto maybe_hdr = proto::parse_header(std::span(buffer.data(), n)); - - if (!maybe_hdr) { - logger.error("Unable to parse header: {}", maybe_hdr.error()); - co_return; - } - - // TODO(ksassenrath): Replace with specialized allocator. - auto msg = std::vector(maybe_hdr->message_size); - - // Copy remaining portion of message in initial read to the message - // buffer. - std::copy(std::next(buffer.begin(), maybe_hdr->bytes_read), - std::next(buffer.begin(), n), msg.begin()); - - auto msg_span = - std::span(msg.begin() + n - maybe_hdr->bytes_read, msg.end()); - - if (auto result = co_await buffer_message(msg_span); !result) { - logger.error("Unable to parse header: {}", result.error()); - co_return; - } - - auto reader = msgpack::token_reader(msg); + auto reader = msgpack::token_reader(msgbuf); std::array tokens; - auto session = + auto connect_info = reader.read_some(tokens) .map_error([](auto) { return proto::error::bad_data; }) .and_then(proto::parse_connect); - if (!session) { - logger.error("Session failed: {}", session.error()); + if (!connect_info) { + logger.error("Session failed: {}", connect_info.error()); co_return; } - co_await server_.create_session( - std::weak_ptr(shared_from_this()), *session); + co_await server_.create_session(shared_from_this(), *connect_info); } enum class state { init, authenticated, active }; monolithic_server& server_; net::ip::tcp::socket socket_; + proto::session* session_; }; monolithic_server::monolithic_server(std::string_view address, @@ -322,15 +321,16 @@ std::error_code monolithic_server::run() noexcept { return {}; } -net::awaitable> -monolithic_server::create_session(std::weak_ptr conn, - proto::connect_info const& session) { +net::awaitable> +monolithic_server::create_session(std::shared_ptr const& conn, + proto::connect_info const& info) { // Move to session strand. co_await net::post(session_strand_, net::bind_executor(session_strand_, use_awaitable)); - // Pretend that there's no open + proto::session session(info.user_id, {}); + // Now, update the map of sessions co_return tl::make_unexpected(proto::error::unsupported); }