//----------------------------------------------------------------------------- // ___ __ _ _ // / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __ // / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ / // / ___/ (_| | | \__ \ __/ /__| | | | | < // \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ . // //----------------------------------------------------------------------------- // Author: Kurt Sassenrath // Module: Server // // Server implementation. Currently, a monolithic server which: // * Communicates with users via TCP (msgpack). // * Runs the websocket server for overlays to read. // // Copyright (c) 2023 Kurt Sassenrath. // // License TBD. //----------------------------------------------------------------------------- #include "parselink/server.h" #include "parselink/logging.h" #include "parselink/msgpack/token/reader.h" #include "parselink/msgpack/token/views.h" #include "parselink/proto/session.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace parselink; #include using namespace parselink; using namespace std::chrono_literals; namespace net = boost::asio; using net::awaitable; using net::co_spawn; using net::deferred; using net::detached; using net::use_awaitable; //----------------------------------------------------------------------------- // TODO(ksassenrath): These are logging formatters for various boost/asio types. // Not all code is exposed to them, so they cannot be defined inside the // generic logging/formatters.h header. They should go somewhere else. //----------------------------------------------------------------------------- template <> struct parselink::logging::theme : parselink::logging::static_theme {}; template <> struct fmt::formatter : fmt::formatter { template constexpr auto format(auto const& v, FormatContext& ctx) const { return fmt::formatter::format(v.message(), ctx); } }; template <> struct fmt::formatter { template constexpr auto parse(ParseContext& ctx) -> decltype(ctx.begin()) { return ctx.begin(); } template auto format(msgpack::token const& v, FormatContext& ctx) const { using parselink::logging::themed_arg; auto out = fmt::format_to( ctx.out(), "()))); break; case msgpack::format::type::signed_int: out = fmt::format_to( out, "{}", themed_arg(*(v.get()))); break; case msgpack::format::type::boolean: out = fmt::format_to(out, "{}", themed_arg(*(v.get()))); break; case msgpack::format::type::string: out = fmt::format_to( out, "{}", themed_arg(*(v.get()))); break; case msgpack::format::type::binary: out = fmt::format_to(out, "{}", themed_arg(*(v.get>()))); break; case msgpack::format::type::map: out = fmt::format_to(out, "(arity: {})", themed_arg(v.get()->count)); break; case msgpack::format::type::array: out = fmt::format_to(out, "(arity: {})", themed_arg(v.get()->count)); break; case msgpack::format::type::nil: out = fmt::format_to(out, "(nil)"); break; case msgpack::format::type::invalid: out = fmt::format_to(out, "(invalid)"); break; default: break; } return fmt::format_to(out, ">"); } }; template concept endpoint = requires(T const& t) { { t.address() }; { t.port() }; }; template struct parselink::logging::theme : parselink::logging::static_theme {}; template struct fmt::formatter : fmt::formatter { template constexpr auto format(auto const& v, FormatContext& ctx) const { return fmt::format_to( ctx.out(), "{}:{}", v.address().to_string(), v.port()); } }; //----------------------------------------------------------------------------- // End formatters //----------------------------------------------------------------------------- namespace { logging::logger logger("server"); constexpr auto no_ex_coro = net::as_tuple(use_awaitable); constexpr auto no_ex_defer = net::as_tuple(deferred); } // namespace class user_connection; class monolithic_server : public server { public: monolithic_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port); std::error_code run() noexcept override; net::awaitable> create_session( std::shared_ptr const& conn, proto::connect_info const& info); private: awaitable user_listen(); std::unordered_set, std::equal_to<>> sessions_; net::io_context io_context_; net::io_context::strand session_strand_; net::ip::address addr_; std::uint16_t user_port_; std::uint16_t websocket_port_; }; class user_connection : public std::enable_shared_from_this { public: user_connection(monolithic_server& server, net::ip::tcp::socket sock) : server_(server) , socket_(std::move(sock)) {} ~user_connection() { stop(); } void stop() { logger.debug("Connection to {} closed.", socket_.remote_endpoint()); boost::system::error_code ec; socket_.shutdown(net::ip::tcp::socket::shutdown_both, ec); socket_.close(); } void start() { logger.debug("New connection from {}", socket_.remote_endpoint()); co_spawn( socket_.get_executor(), [self = shared_from_this()] { return self->await_connect(); }, detached); } awaitable> read_message( std::vector& buffer) noexcept { // Use a small buffer on the stack to read the initial header. std::array hdrbuff; auto [ec, n] = co_await socket_.async_read_some( net::buffer(hdrbuff), no_ex_coro); if (ec) { logger.error("Reading hdr from user socket failed: {}", ec); co_return tl::make_unexpected(proto::error::system_error); } logger.debug("Read {} bytes from client: {}", n, std::span(hdrbuff.data(), n)); auto maybe_hdr = proto::parse_header(std::span(hdrbuff.data(), n)); if (!maybe_hdr) { logger.error("Unable to parse header: {}", maybe_hdr.error()); co_return tl::make_unexpected(maybe_hdr.error()); } // TODO(ksassenrath): Replace with specialized allocator. buffer.resize(maybe_hdr->message_size); // Copy remaining portion of message in initial read to the message // buffer. std::copy(std::next(hdrbuff.begin(), maybe_hdr->bytes_read), std::next(hdrbuff.begin(), n), buffer.begin()); auto span = std::span( buffer.begin() + n - maybe_hdr->bytes_read, buffer.end()); // Buffer remaining message. std::size_t amt = 0; while (amt < span.size()) { auto subsp = span.subspan(amt); auto [ec, n] = co_await socket_.async_read_some( net::buffer(subsp), no_ex_coro); logger.debug("Read {} bytes, total is now {}", n, amt + n); if (ec || n == 0) { logger.error("Reading msg from user socket failed: {}", ec); co_return tl::make_unexpected(proto::error::system_error); } amt += n; } co_return tl::monostate{}; } awaitable await_connect() noexcept { std::vector msgbuf; if (auto maybe_msg = co_await read_message(msgbuf); !maybe_msg) { logger.debug("returning"); co_return; } auto reader = msgpack::token_reader(msgbuf); std::array tokens; auto connect_info = reader.read_some(tokens) .map_error([](auto) { return proto::error::bad_data; }) .and_then(proto::parse_connect); if (!connect_info) { logger.error("Session failed: {}", connect_info.error()); co_return; } co_await server_.create_session(shared_from_this(), *connect_info); } enum class state { init, authenticated, active }; monolithic_server& server_; net::ip::tcp::socket socket_; proto::session* session_; }; monolithic_server::monolithic_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port) : io_context_{1} , session_strand_(io_context_) , addr_(net::ip::address::from_string(std::string{address})) , user_port_{user_port} , websocket_port_{websocket_port} { logger.debug("Creating monolithic_server(address = {}, user_port = {}, " "websocket_port = {})", address, user_port_, websocket_port_); } awaitable monolithic_server::user_listen() { auto exec = co_await net::this_coro::executor; net::ip::tcp::acceptor acceptor{exec, {addr_, user_port_}}; while (true) { std::make_shared( *this, co_await acceptor.async_accept(use_awaitable)) ->start(); } } std::error_code monolithic_server::run() noexcept { logger.info("Starting server."); net::signal_set signals(io_context_, SIGINT, SIGTERM); signals.async_wait([&](auto, auto) { logger.info("Received signal... Shutting down."); io_context_.stop(); }); co_spawn(io_context_, user_listen(), detached); io_context_.run(); return {}; } net::awaitable> monolithic_server::create_session(std::shared_ptr const& conn, proto::connect_info const& info) { // Move to session strand. co_await net::post(session_strand_, net::bind_executor(session_strand_, use_awaitable)); proto::session session(info.user_id, {}); // Now, update the map of sessions co_return tl::make_unexpected(proto::error::unsupported); } std::unique_ptr parselink::make_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port) { using impl = monolithic_server; return std::make_unique(address, user_port, websocket_port); }