//----------------------------------------------------------------------------- // ___ __ _ _ // / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __ // / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ / // / ___/ (_| | | \__ \ __/ /__| | | | | < // \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ . // //----------------------------------------------------------------------------- // Author: Kurt Sassenrath // Module: Server // // Server implementation. Currently, a monolithic server which: // * Communicates with users via TCP (msgpack). // * Runs the websocket server for overlays to read. // // Copyright (c) 2023 Kurt Sassenrath. // // License TBD. //----------------------------------------------------------------------------- #include "parselink/logging.h" #include "parselink/server.h" #include "parselink/msgpack/token/reader.h" #include "parselink/msgpack/token/views.h" #include "parselink/proto/message.h" #include #include #include #include #include #include #include #include #include #include #include using namespace parselink; namespace net = boost::asio; using net::co_spawn; using net::awaitable; using net::use_awaitable; using net::deferred; using net::detached; //----------------------------------------------------------------------------- // TODO(ksassenrath): These are logging formatters for various boost/asio types. // Not all code is exposed to them, so they cannot be defined inside the // generic logging/formatters.h header. They should go somewhere else. //----------------------------------------------------------------------------- template <> struct parselink::logging::theme : parselink::logging::static_theme {}; template <> struct fmt::formatter : fmt::formatter { template constexpr auto format(auto const& v, FormatContext& ctx) const { return fmt::formatter::format(v.message(), ctx); } }; template <> struct fmt::formatter { template constexpr auto parse(ParseContext& ctx) -> decltype(ctx.begin()) { return ctx.begin(); } template auto format(msgpack::token const& v, FormatContext& ctx) const { using parselink::logging::themed_arg; auto out = fmt::format_to(ctx.out(), "()))); break; case msgpack::format::type::signed_int: out = fmt::format_to(out, "{}", themed_arg(*(v.get()))); break; case msgpack::format::type::boolean: out = fmt::format_to(out, "{}", themed_arg(*(v.get()))); break; case msgpack::format::type::string: out = fmt::format_to(out, "{}", themed_arg(*(v.get()))); break; case msgpack::format::type::binary: out = fmt::format_to(out, "{}", themed_arg(*(v.get>()))); break; case msgpack::format::type::map: out = fmt::format_to(out, "(arity: {})", themed_arg(v.get()->count)); break; case msgpack::format::type::array: out = fmt::format_to(out, "(arity: {})", themed_arg(v.get()->count)); break; case msgpack::format::type::nil: out = fmt::format_to(out, "(nil)"); break; case msgpack::format::type::invalid: out = fmt::format_to(out, "(invalid)"); break; default: break; } return fmt::format_to(out, ">"); } }; template concept endpoint = requires(T const& t) { {t.address()}; {t.port()}; }; template struct parselink::logging::theme : parselink::logging::static_theme {}; template struct fmt::formatter : fmt::formatter { template constexpr auto format(auto const& v, FormatContext& ctx) const { return fmt::format_to(ctx.out(), "{}:{}", v.address().to_string(), v.port()); } }; //----------------------------------------------------------------------------- // End formatters //----------------------------------------------------------------------------- namespace { logging::logger logger("server"); constexpr auto no_ex_coro = net::as_tuple(use_awaitable); constexpr auto no_ex_defer = net::as_tuple(deferred); } struct msgbuf { std::vector payload; }; class user_session : public std::enable_shared_from_this { public: user_session(net::ip::tcp::socket sock) : socket_(std::move(sock)) {} ~user_session() { logger.debug("Closing connection to {}", socket_.remote_endpoint()); boost::system::error_code ec; socket_.shutdown(net::ip::tcp::socket::shutdown_both, ec); socket_.close(); } void start() { logger.debug("New connection from {}", socket_.remote_endpoint()); co_spawn(socket_.get_executor(), [self = shared_from_this()]{ return self->await_auth(); }, detached); } tl::expected, msgpack::error> parse_header( std::span data) noexcept { auto reader = msgpack::token_reader(data); auto magic = reader.read_one().map( [](auto t){ return t == std::string_view{"prs"}; }); if (magic && *magic) { logger.debug("Got magic from client"); } else { logger.error("Failed to get magic from client: {}", magic); return tl::unexpected(magic.error()); } auto sz = reader.read_one().and_then( [](auto t){ return t.template get(); }); if (sz && *sz) { logger.debug("Got packet size from client: {}", *sz); } else { logger.debug("Failed to get packet size from client: {}", sz); return tl::unexpected(magic.error()); } // Copy the rest of the message to the buffer for parsing. // TODO(ksassenrath): Replace vector with custom buffer. std::vector msg; msg.reserve(*sz); msg.resize(reader.remaining()); std::copy(reader.current(), reader.end(), msg.begin()); return msg; } awaitable> buffer_message(std::vector& buffer) noexcept { auto amt = buffer.size(); auto total = buffer.capacity(); buffer.resize(total); while (amt < total) { auto subf = std::span(buffer.begin() + amt, buffer.end()); auto [ec, n] = co_await socket_.async_read_some( net::buffer(subf), no_ex_coro); logger.debug("Read {} bytes, total is now {}", n, amt + n); if (ec || n == 0) { logger.error("Reading from user socket failed: {}", ec); co_return tl::make_unexpected(ec); } amt += n; } co_return std::monostate{}; } tl::expected handle_auth(std::span tokens) { auto message_type = tokens.begin()->get(); if (message_type) { logger.debug("Received '{}' packet. Parsing body", *message_type); proto::connect_message message; for (auto const& [k, v] : msgpack::map_view(tokens.subspan(1))) { logger.debug("Parsing {} -> {}", k, v); } } else { logger.error("Did not get message type: {}", message_type.error()); } // The first token should include return true; } awaitable await_auth() noexcept { std::array buffer; auto [ec, n] = co_await socket_.async_read_some( net::buffer(buffer), no_ex_coro); if (ec) { logger.error("Reading from user socket failed: {}", ec); co_return; } logger.debug("Read {} bytes from client: {}", n, std::span(buffer.data(), n)); auto hdr_result = parse_header(std::span(buffer.data(), n)); if (!hdr_result) { co_return; } auto msg = std::move(*hdr_result); auto maybe_error = co_await buffer_message(msg); if (!maybe_error) { logger.error("Unable to buffer message: {}", maybe_error.error()); co_return; } logger.trace("Message: {}", msg); auto reader = msgpack::token_reader(msg); std::array tokens; auto parsed = reader.read_some(tokens).and_then( [this](auto c) { for (auto t : c) logger.trace("{}", t); return handle_auth(c); }) .or_else([](auto const& error) { logger.error("Unable to parse msgpack tokens: {}", error); }); if (!parsed) { co_return; } // Authenticate against database. } enum class state { init, authenticated, active }; net::ip::tcp::socket socket_; }; class monolithic_server : public server { public: monolithic_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port); std::error_code run() noexcept override; private: awaitable user_listen(); net::io_context io_context_; net::ip::address addr_; std::uint16_t user_port_; std::uint16_t websocket_port_; }; monolithic_server::monolithic_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port) : io_context_{1} , addr_(net::ip::address::from_string(std::string{address})) , user_port_{user_port} , websocket_port_{websocket_port} { logger.debug("Creating monolithic_server(address = {}, user_port = {}, " "websocket_port = {})", address, user_port_, websocket_port_); } awaitable monolithic_server::user_listen() { auto exec = co_await net::this_coro::executor; net::ip::tcp::acceptor acceptor{exec, {addr_, user_port_}}; while (true) { std::make_shared( co_await acceptor.async_accept(use_awaitable))->start(); } } std::error_code monolithic_server::run() noexcept { logger.info("Starting server."); net::signal_set signals(io_context_, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ logger.info("Received signal... Shutting down."); io_context_.stop(); }); co_spawn(io_context_, user_listen(), detached); io_context_.run(); return {}; } std::unique_ptr parselink::make_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port) { using impl = monolithic_server; return std::make_unique(address, user_port, websocket_port); }