//----------------------------------------------------------------------------- // ___ __ _ _ // / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __ // / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ / // / ___/ (_| | | \__ \ __/ /__| | | | | < // \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ . // //----------------------------------------------------------------------------- // Author: Kurt Sassenrath // Module: Server // // Server implementation. Currently, a monolithic server which: // * Communicates with users via TCP (msgpack). // * Runs the websocket server for overlays to read. // // Copyright (c) 2023 Kurt Sassenrath. // // License TBD. //----------------------------------------------------------------------------- #include "parselink/logging.h" #include "parselink/server.h" #include #include #include #include #include #include #include #include #include #include using namespace parselink; namespace net = boost::asio; using net::co_spawn; using net::awaitable; using net::use_awaitable; using net::deferred; using net::detached; //----------------------------------------------------------------------------- // TODO(ksassenrath): These are logging formatters for various boost/asio types. // Not all code is exposed to them, so they cannot be defined inside the // generic logging/formatters.h header. They should go somewhere else. //----------------------------------------------------------------------------- template <> struct parselink::logging::theme : parselink::logging::static_theme {}; template <> struct fmt::formatter : fmt::formatter { template constexpr auto format(auto const& v, FormatContext& ctx) const { return fmt::formatter::format(v.message(), ctx); } }; template concept endpoint = requires(T const& t) { {t.address()}; {t.port()}; }; template struct parselink::logging::theme : parselink::logging::static_theme {}; template struct fmt::formatter : fmt::formatter { template constexpr auto format(auto const& v, FormatContext& ctx) const { return fmt::format_to(ctx.out(), "{}:{}", v.address().to_string(), v.port()); } }; //----------------------------------------------------------------------------- // End formatters //----------------------------------------------------------------------------- namespace { logging::logger logger("server"); constexpr auto no_ex_coro = net::as_tuple(use_awaitable); constexpr auto no_ex_defer = net::as_tuple(deferred); } class user_session : public std::enable_shared_from_this { public: user_session(net::ip::tcp::socket sock) : socket_(std::move(sock)) {} ~user_session() { logger.debug("Closing connection to {}", socket_.remote_endpoint()); } void start() { logger.debug("New connection from {}", socket_.remote_endpoint()); co_spawn(socket_.get_executor(), [self = shared_from_this()]{ return self->reader(); }, detached); } awaitable reader() { std::array buffer; auto [ec, n] = co_await socket_.async_read_some( net::buffer(buffer), no_ex_coro); if (ec) { logger.error("Reading from user socket failed: {}", ec); co_return; } logger.info("Read {} bytes from client: {}", n, std::string_view(reinterpret_cast(buffer.data()), n)); } enum class state { init, authenticated, active }; net::ip::tcp::socket socket_; }; class monolithic_server : public server { public: monolithic_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port); std::error_code run() noexcept override; private: awaitable user_listen(); net::io_context io_context_; net::ip::address addr_; std::uint16_t user_port_; std::uint16_t websocket_port_; }; monolithic_server::monolithic_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port) : io_context_{1} , addr_(net::ip::address::from_string(std::string{address})) , user_port_{user_port} , websocket_port_{websocket_port} { logger.debug("Creating monolithic_server(address = {}, user_port = {}, " "websocket_port = {})", address, user_port_, websocket_port_); } awaitable monolithic_server::user_listen() { auto exec = co_await net::this_coro::executor; net::ip::tcp::acceptor acceptor{exec, {addr_, user_port_}}; while (true) { std::make_shared( co_await acceptor.async_accept(use_awaitable))->start(); } } std::error_code monolithic_server::run() noexcept { logger.info("Starting server."); net::signal_set signals(io_context_, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ logger.info("Received signal... Shutting down."); io_context_.stop(); }); co_spawn(io_context_, user_listen(), detached); io_context_.run(); return {}; } std::unique_ptr parselink::make_server(std::string_view address, std::uint16_t user_port, std::uint16_t websocket_port) { using impl = monolithic_server; return std::make_unique(address, user_port, websocket_port); }