From 9164e7dc601a84cf01a6c4e3553b6a5cae366a37 Mon Sep 17 00:00:00 2001 From: Kurt Sassenrath Date: Tue, 19 Sep 2023 12:26:22 -0700 Subject: [PATCH] WIP Message --- source/BUILD | 3 +- source/include/parselink/message/message.h | 78 +++++++++++++++++++ .../include/parselink/msgpack/token/reader.h | 1 + source/message/builder.cpp | 0 source/server.cpp | 59 +++++++++----- 5 files changed, 119 insertions(+), 22 deletions(-) create mode 100644 source/include/parselink/message/message.h create mode 100644 source/message/builder.cpp diff --git a/source/BUILD b/source/BUILD index b87c4c3..994c91b 100644 --- a/source/BUILD +++ b/source/BUILD @@ -29,7 +29,8 @@ cc_binary( ], deps = [ "headers", + "msgpack", "@boost//:beast", - "//source/common" + "//source/common", ], ) diff --git a/source/include/parselink/message/message.h b/source/include/parselink/message/message.h new file mode 100644 index 0000000..61bb25b --- /dev/null +++ b/source/include/parselink/message/message.h @@ -0,0 +1,78 @@ +//----------------------------------------------------------------------------- +// ___ __ _ _ +// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __ +// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ / +// / ___/ (_| | | \__ \ __/ /__| | | | | < +// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ . +// +//----------------------------------------------------------------------------- +// Author: Kurt Sassenrath +// Module: Message +// +// Message types. +// +// Copyright (c) 2023 Kurt Sassenrath. +// +// License TBD. +//----------------------------------------------------------------------------- +#ifndef message_0c61530748b9f966 +#define message_0c61530748b9f966 + +#include +#include + +#include + +namespace parselink { +namespace message { + +enum class error { + bad_magic, // Did not get the message magic expected + too_large, // The message size was too large. + unknown_type, // The message type is not known +}; + +// Parselink messages are encoded with MessagePack and take the form of: +// | magic | size | content | +// - [magic] is the string "prs", in "fixstr" format. +// - [size] is the size of the message, not including this header. +// - [content] is [size] bytes of MessagePack data, including the +// specific type of message presented. + +// This class is responsible for consuming buffer data and yielding a message +// instance when complete. Will throw an error if data is incorrect. +// +class builder { +public: + // For now, builders don't manage any buffers themselves. Later, that + // may change. + builder() = default; + + // Reset the builder to its initial state. This means any partially-decoded + // message data will be lost. + void reset() noexcept; + + // How many bytes are needed to perform a meaningful amount of work. + std::size_t bytes_needed() noexcept; + + // Process data from a buffer, building messages. Returns the number of + // bytes read from the buffer for the caller's bookkeeping. May yield a + // message in addition. + std::size_t process(std::span buffer) noexcept; + +private: + enum class state { + magic, + size, + payload + }; + + state state_{state::magic}; + std::size_t payload_size_{}; + std::size_t payload_remaining_{}; +}; + +} // namespace message +} // namespace parselink + +#endif // message_0c61530748b9f966 diff --git a/source/include/parselink/msgpack/token/reader.h b/source/include/parselink/msgpack/token/reader.h index f244ad4..06c561f 100644 --- a/source/include/parselink/msgpack/token/reader.h +++ b/source/include/parselink/msgpack/token/reader.h @@ -251,6 +251,7 @@ public: // fully parse a token, then incomplete_message is returned. constexpr tl::expected, error> read_some( std::span token_buffer) noexcept; + private: std::span data_; decltype(data_)::iterator curr_; diff --git a/source/message/builder.cpp b/source/message/builder.cpp new file mode 100644 index 0000000..e69de29 diff --git a/source/server.cpp b/source/server.cpp index 47c8f85..9769363 100644 --- a/source/server.cpp +++ b/source/server.cpp @@ -19,7 +19,7 @@ //----------------------------------------------------------------------------- #include -#include +#include #include #include @@ -91,6 +91,41 @@ namespace { constexpr auto no_ex_defer = net::as_tuple(deferred); } +class user_session : public std::enable_shared_from_this { +public: + user_session(net::ip::tcp::socket sock) : socket_(std::move(sock)) {} + ~user_session() { + logger.debug("Closing connection to {}", socket_.remote_endpoint()); + } + + void start() { + logger.debug("New connection from {}", socket_.remote_endpoint()); + co_spawn(socket_.get_executor(), [self = shared_from_this()]{ + return self->reader(); + }, detached); + } + + awaitable reader() { + std::array buffer; + auto [ec, n] = co_await socket_.async_read_some( + net::buffer(buffer), no_ex_coro); + if (ec) { + logger.error("Reading from user socket failed: {}", ec); + co_return; + } + logger.info("Read {} bytes from client: {}", n, + std::string_view(reinterpret_cast(buffer.data()), n)); + } + + enum class state { + init, + authenticated, + active + }; + + net::ip::tcp::socket socket_; +}; + class monolithic_server : public server { public: monolithic_server(std::string_view address, std::uint16_t user_port, @@ -100,7 +135,6 @@ public: private: - awaitable echo(net::ip::tcp::socket socket); awaitable user_listen(); net::io_context io_context_; @@ -120,29 +154,12 @@ monolithic_server::monolithic_server(std::string_view address, "websocket_port = {})", address, user_port_, websocket_port_); } -awaitable monolithic_server::echo(net::ip::tcp::socket socket) { - std::array buffer; - while (true) { - auto [ec, n] = co_await socket.async_read_some(net::buffer(buffer), no_ex_coro); - if (ec) { - logger.error("Read from socket failed: {}", ec); - co_return; - } - auto [ec2, x] = co_await net::async_write(socket, net::buffer(buffer, n), no_ex_coro); - if (ec2) { - logger.error("Write to socket failed: {}", ec); - co_return; - } - } -} - awaitable monolithic_server::user_listen() { auto exec = co_await net::this_coro::executor; net::ip::tcp::acceptor acceptor{exec, {addr_, user_port_}}; while (true) { - auto socket = co_await acceptor.async_accept(use_awaitable); - logger.debug("Accepted new connection from {}", socket.remote_endpoint()); - co_spawn(exec, echo(std::move(socket)), detached); + std::make_shared( + co_await acceptor.async_accept(use_awaitable))->start(); } }