WIP Server stuff

This commit is contained in:
Kurt Sassenrath 2023-11-22 22:54:08 -08:00
parent cbca4be237
commit 84942171ea
13 changed files with 610 additions and 30 deletions

View File

@ -15,6 +15,14 @@ cc_library(
visibility = ["//visibility:public"],
)
cc_library(
name = "server",
hdrs = glob(["server/**/*.h"]),
includes = ["."],
deps = ["@expected", "//include:path"],
visibility = ["//visibility:public"],
)
cc_library(
name = "proto",
hdrs = glob(["proto/**/*.h"]),

View File

@ -91,7 +91,7 @@ struct fmt::formatter<std::errc> : fmt::formatter<std::error_code> {
template <typename FormatContext>
auto format(std::errc const& v, FormatContext& ctx) const {
return fmt::formatter<std::error_code>::format(
std::make_error_code(v), ctx);
std::make_error_code(v), ctx);
}
};

View File

@ -35,6 +35,7 @@
#include <system_error>
#include <type_traits>
#include <fmt/chrono.h>
#include <fmt/color.h>
#include <tl/expected.hpp>

View File

@ -0,0 +1,283 @@
//-----------------------------------------------------------------------------
// ___ __ _ _
// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __
// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ /
// / ___/ (_| | | \__ \ __/ /__| | | | | <
// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ .
//
//-----------------------------------------------------------------------------
// Author: Kurt Sassenrath
// Module: msgpack
//
// "Token" writing implementation, which supports both tokens and deduction for
// common types.
//
// Copyright (c) 2023 Kurt Sassenrath.
//
// License TBD.
//-----------------------------------------------------------------------------
#ifndef msgpack_core_writer_ce48a51aa6ed0858
#define msgpack_core_writer_ce48a51aa6ed0858
#include "parselink/msgpack/core/error.h"
#include "parselink/msgpack/core/format.h"
#include "parselink/msgpack/util/endianness.h"
#include <limits>
#include <type_traits>
#include <tl/expected.hpp>
namespace msgpack {
enum class writer_error {
none,
unsupported,
not_implemented,
bad_value,
out_of_space
};
// Helper template for writing a non-integral datatype to an output.
// template <typename T>
// struct write_adapter {
// template <typename Itr>
// static constexpr tl::expected<Itr, error> write(T const& t);
//};
template <std::size_t N, typename Itr>
constexpr inline decltype(auto) write_bytes(
std::array<std::byte, N>&& data, Itr out) noexcept {
for (auto b : data) {
*out++ = b;
}
return out;
}
#if 0
// Figure out the smallest
namespace detail {
constexpr auto const& deduce_format(std::uint64_t value) {
if (value <= format::positive_fixint::mask) return format::
}
template <std::integral T>
struct write_adapter<T> {
static constexpr auto size(T t) noexcept {
}
}
} // namespace detail
#endif
template <typename T>
struct write_adapter {};
template <std::integral T>
struct write_adapter<T> {
static constexpr auto size(T) noexcept { return sizeof(T); }
template <typename Itr>
static constexpr auto write(T t, Itr out) noexcept {
return write_bytes(detail::raw_cast(host_to_be(t)), out);
}
};
template <>
struct write_adapter<std::string_view> {
static constexpr auto size(std::string_view str) noexcept {
return str.size();
}
template <typename Itr>
static constexpr auto write(std::string_view str, Itr out) noexcept {
std::byte const* beg =
reinterpret_cast<std::byte const*>(&*str.begin());
std::copy(beg, beg + str.size(), out);
return out += str.size();
}
};
template <>
struct write_adapter<std::span<std::byte const>> {
static constexpr auto size(std::span<std::byte const> bytes) noexcept {
return bytes.size();
}
template <typename Itr>
static constexpr auto write(
std::span<std::byte const> bytes, Itr out) noexcept {
std::copy(bytes.begin(), bytes.end(), out);
return out += bytes.size();
}
};
template <>
struct write_adapter<map_desc> {
static constexpr auto value(map_desc desc) noexcept { return desc.count; }
};
template <>
struct write_adapter<array_desc> {
static constexpr auto value(array_desc desc) noexcept { return desc.count; }
};
// TODO: These could be optimized away because invalid/nil never contain real
// data.
template <>
struct write_adapter<invalid> {
static constexpr auto value(invalid) noexcept { return 0; }
};
template <>
struct write_adapter<nil> {
static constexpr auto value(nil) noexcept { return 0; }
};
namespace detail {
template <typename T>
using expected = tl::expected<T, error>;
template <format_type F>
constexpr inline std::size_t calculate_space(
typename F::value_type val) noexcept {
// At a minimum, one byte is needed to store the format.
std::size_t size = sizeof(typename F::first_type);
if (!is_fixtype<F>) {
++size; // For format
}
if constexpr (F::payload_type == format::payload::variable) {
size += write_adapter<typename F::value_type>::size(val);
}
return size;
}
// The "first type" is either the size of the variable length payload or
// a fixed-length value. Additionally, this "first type" may be inlined
// into the marker byte if it's a fix type.
template <format_type F>
constexpr inline expected<typename F::first_type> pack_first(
typename F::value_type value) noexcept {
using value_type = typename F::value_type;
if constexpr (F::payload_type == format::payload::variable) {
return typename F::first_type(write_adapter<value_type>::size(value));
} else {
if constexpr (requires { write_adapter<value_type>::value; }) {
return typename F::first_type(
write_adapter<value_type>::value(value));
} else {
return typename F::first_type(value);
}
}
}
template <format_type F, typename Itr>
constexpr inline expected<Itr> write(
typename F::value_type&& value, Itr out, Itr const end) {
using diff_type = typename std::iterator_traits<Itr>::difference_type;
if (diff_type(calculate_space<F>(value)) > std::distance(out, end)) {
return tl::make_unexpected(error::out_of_space);
}
auto marker = F::marker;
auto result = pack_first<F>(value);
if (!result) {
return tl::make_unexpected(result.error());
}
if constexpr (is_fixtype<F>) {
if (*result > 0xff) {
return tl::make_unexpected(error::bad_value);
}
if constexpr (F::flags & format::flag::apply_mask) {
marker |= std::byte(*result);
} else {
marker = std::byte(*result);
}
if ((marker & ~F::mask) != F::marker) {
return tl::make_unexpected(error::bad_value);
}
}
*out++ = marker;
if constexpr (!is_fixtype<F>) {
out = write_adapter<typename decltype(result)::value_type>::write(
*result, out);
}
if constexpr (F::payload_type == format::payload::variable) {
out = write_adapter<typename F::value_type>::write(value, out);
}
return out;
}
template <typename T>
struct format_hint;
template <>
struct format_hint<std::uint8_t> {
using type = format::positive_fixint;
};
template <>
struct format_hint<std::uint16_t> {
using type = format::uint16;
};
} // namespace detail
class writer {
public:
template <typename T>
using expected = detail::expected<T>;
constexpr writer(std::span<std::byte> dest)
: data(dest)
, curr(std::begin(data))
, end(std::end(data)) {}
template <format_type F>
constexpr expected<tl::monostate> write(
typename F::value_type&& v) noexcept {
using value_type = typename F::value_type;
if (curr == end) return tl::make_unexpected(error::out_of_space);
auto result = detail::write<F>(std::forward<value_type>(v), curr, end);
if (!result) {
return tl::make_unexpected(result.error());
}
curr = *result;
return tl::monostate{};
}
template <typename T>
requires requires { typename detail::format_hint<T>::type; }
constexpr expected<tl::monostate> write(T&& v) {
return write<typename detail::format_hint<T>::type>(std::forward<T>(v));
}
constexpr auto pos() const noexcept { return curr; }
constexpr auto tell() const noexcept {
return std::distance(std::begin(data), curr);
}
constexpr auto subspan() const noexcept {
return std::span<std::byte>(std::begin(data), tell());
}
private:
std::span<std::byte> data;
decltype(data)::iterator curr;
decltype(data)::iterator end;
};
} // namespace msgpack
#endif // msgpack_core_writer_ce48a51aa6ed0858

View File

@ -19,6 +19,7 @@
#define session_07eae057feface79
#include "parselink/msgpack/token.h"
#include "parselink/proto/session_id.h"
#include <chrono>
#include <cstdint>
#include <functional>
@ -81,36 +82,34 @@ struct transparent_hash {
class session {
public:
using close_handle = std::function<void(std::string_view)>;
session(std::string_view user_id, close_handle hdl) noexcept;
session(std::string_view user_id) noexcept;
~session();
std::span<std::byte const> id() const noexcept { return id_; }
session_id id() const noexcept { return id_; }
std::string_view user_id() const noexcept { return user_id_; }
auto last_activity() const noexcept { return last_activity_; }
void update_last_activity(
std::chrono::steady_clock::time_point =
std::chrono::steady_clock::now()) noexcept {}
constexpr bool operator==(std::span<std::byte const> id) const noexcept {
return std::ranges::equal(id, id_);
}
std::chrono::system_clock::time_point =
std::chrono::system_clock::now()) noexcept {}
private:
std::array<std::byte, 32> id_;
session_id id_;
std::string user_id_;
close_handle closer_;
std::chrono::steady_clock::time_point last_activity_;
std::chrono::system_clock::time_point last_activity_;
};
template <>
struct transparent_hash<session>
struct transparent_hash<std::string> : transparent_hash<std::string_view> {};
template <>
struct transparent_hash<session_id>
: transparent_hash<std::span<std::byte const>> {
[[nodiscard]] auto operator()(session const& s) const {
return std::hash<std::span<std::byte const>>{}(s.id());
[[nodiscard]] auto operator()(session_id const& s) const {
return std::hash<std::span<std::byte const>>{}(s.raw());
}
};

View File

@ -0,0 +1,68 @@
//-----------------------------------------------------------------------------
// ___ __ _ _
// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __
// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ /
// / ___/ (_| | | \__ \ __/ /__| | | | | <
// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ .
//
//-----------------------------------------------------------------------------
// Author: Kurt Sassenrath
// Module: Server
//
// Simple in-memory session manager.
//
// Copyright (c) 2023 Kurt Sassenrath.
//
// License TBD.
//-----------------------------------------------------------------------------
#ifndef memory_session_manager_b3851872babe001d
#define memory_session_manager_b3851872babe001d
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <parselink/server/session_manager.h>
#include <unordered_set>
namespace parselink {
class memory_session_manager {
public:
memory_session_manager(boost::asio::io_context& ctx);
~memory_session_manager();
// Allocate a new session.
tl::expected<proto::session*, proto::error> create_session(
std::string_view user_id);
// Destroy an existing session.
tl::expected<tl::monostate, proto::error> destroy_session(
proto::session* session);
// Find a session by its user id.
tl::expected<proto::session*, proto::error> find(std::string_view user_id);
// Find a session by its ID.
tl::expected<proto::session*, proto::error> find(
proto::session_id const& session);
private:
std::unordered_map<std::string, proto::session,
proto::transparent_hash<std::string>, std::equal_to<>>
sessions_;
std::unordered_map<proto::session_id, proto::session*,
proto::transparent_hash<proto::session_id>, std::equal_to<>>
lookup_by_sid_;
boost::asio::io_context& ctx_;
boost::asio::io_context::strand strand_;
};
// Sanity check
static_assert(session_manager<memory_session_manager>);
static_assert(sync_session_manager<memory_session_manager>);
static_assert(!async_session_manager<memory_session_manager>);
} // namespace parselink
#endif // memory_session_manager_b3851872babe001d

View File

@ -0,0 +1,42 @@
//-----------------------------------------------------------------------------
// ___ __ _ _
// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __
// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ /
// / ___/ (_| | | \__ \ __/ /__| | | | | <
// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ .
//
//-----------------------------------------------------------------------------
// Author: Kurt Sassenrath
// Module: Server
//
// Server interface.
//
// Copyright (c) 2023 Kurt Sassenrath.
//
// License TBD.
//-----------------------------------------------------------------------------
#ifndef server_5b46f075be3caa00
#define server_5b46f075be3caa00
#include <cstdint>
#include <memory>
namespace parselink {
template <typename Server>
concept server_concept = requires(Server& srv) {
{ srv.run() } -> std::same_as<std::error_code>;
};
class server {
public:
virtual ~server() = default;
virtual std::error_code run() noexcept = 0;
};
std::unique_ptr<server> make_server(std::string_view address,
std::uint16_t user_port, std::uint16_t websocket_port);
} // namespace parselink
#endif // server_5b46f075be3caa00

View File

@ -0,0 +1,69 @@
//-----------------------------------------------------------------------------
// ___ __ _ _
// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __
// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ /
// / ___/ (_| | | \__ \ __/ /__| | | | | <
// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ .
//
//-----------------------------------------------------------------------------
// Author: Kurt Sassenrath
// Module: Server
//
// Session manager concept.
//
// Copyright (c) 2023 Kurt Sassenrath.
//
// License TBD.
//-----------------------------------------------------------------------------
#ifndef session_manager_779b9fc5781fb66f
#define session_manager_779b9fc5781fb66f
#include <boost/asio/awaitable.hpp>
#include <parselink/proto/session.h>
#include <parselink/proto/session_id.h>
namespace parselink {
using boost::asio::awaitable;
template <typename T>
concept sync_session_manager =
requires(T& mgr, std::string_view sv, proto::session* s) {
{
mgr.create_session(sv)
} -> std::same_as<tl::expected<proto::session*, proto::error>>;
} && requires(T& mgr, proto::session* s) {
{
mgr.destroy_session(s)
} -> std::same_as<tl::expected<tl::monostate, proto::error>>;
} && requires(T& mgr, std::string_view user_id) {
{
mgr.find(user_id)
} -> std::same_as<tl::expected<proto::session*, proto::error>>;
} && requires(T& mgr, proto::session_id const& sid) {
{
mgr.find(sid)
} -> std::same_as<tl::expected<proto::session*, proto::error>>;
};
template <typename T>
concept async_session_manager = requires(T& mgr, proto::session* s) {
{
mgr.create_session()
} -> std::same_as<awaitable<tl::expected<proto::session*, proto::error>>>;
} && requires(T& mgr, proto::session* s) {
{
mgr.destroy_session(s)
} -> std::same_as<awaitable<tl::expected<tl::monostate, proto::error>>>;
} && requires(T& mgr, proto::session_id const& sid) {
{
mgr.find(sid)
} -> std::same_as<awaitable<tl::expected<proto::session*, proto::error>>>;
};
template <typename T>
concept session_manager = sync_session_manager<T> || async_session_manager<T>;
} // namespace parselink
#endif // session_manager_779b9fc5781fb66f

View File

@ -18,6 +18,7 @@ cc_binary(
"//include/parselink:utility",
"//source/logging",
"//source/proto",
"//source/server",
"@boost//:beast",
],
)

View File

@ -188,12 +188,11 @@ tl::expected<connect_info, error> proto::parse_connect(
}
session::session(std::string_view user_id, close_handle hdl) noexcept
: id_(get_random_bytes<32>())
: id_()
, user_id_(std::string{user_id})
, closer_(std::move(hdl))
, last_activity_(std::chrono::steady_clock::now()) {
logger.debug("New session with id {} created for {} {:x}", id_, user_id_,
transparent_hash<session>{}(*this));
, last_activity_(std::chrono::system_clock::now()) {
logger.debug("New session with id {} created for {}", id_.raw(), user_id_);
}
session::~session() {

View File

@ -21,6 +21,7 @@
#include "parselink/server.h"
#include "parselink/logging.h"
#include "parselink/msgpack/core/writer.h"
#include "parselink/msgpack/token/reader.h"
#include "parselink/msgpack/token/views.h"
#include "parselink/proto/session.h"
@ -152,6 +153,8 @@ constexpr auto no_ex_coro = net::as_tuple(use_awaitable);
constexpr auto no_ex_defer = net::as_tuple(deferred);
} // namespace
#include <parselink/server/memory_session_manager.h>
class user_connection;
class monolithic_server : public server {
@ -168,12 +171,9 @@ public:
private:
awaitable<void> user_listen();
std::unordered_set<proto::session, proto::transparent_hash<proto::session>,
std::equal_to<>>
sessions_;
net::io_context io_context_;
net::io_context::strand session_strand_;
memory_session_manager session_mgr_;
net::ip::address addr_;
std::uint16_t user_port_;
std::uint16_t websocket_port_;
@ -273,20 +273,28 @@ public:
co_return;
}
co_await server_.create_session(shared_from_this(), *connect_info);
auto session = co_await server_.create_session(
shared_from_this(), *connect_info);
if (!session) {
co_return;
}
session_ = *session;
auto writer = msgpack::writer(msgbuf);
}
enum class state { init, authenticated, active };
monolithic_server& server_;
net::ip::tcp::socket socket_;
proto::session* session_;
proto::session* session_{};
};
monolithic_server::monolithic_server(std::string_view address,
std::uint16_t user_port, std::uint16_t websocket_port)
: io_context_{1}
, session_strand_(io_context_)
, session_mgr_(io_context_)
, addr_(net::ip::address::from_string(std::string{address}))
, user_port_{user_port}
, websocket_port_{websocket_port} {
@ -327,11 +335,9 @@ monolithic_server::create_session(std::shared_ptr<user_connection> const& conn,
// Move to session strand.
co_await net::post(session_strand_,
net::bind_executor(session_strand_, use_awaitable));
proto::session session(info.user_id, {});
// Now, update the map of sessions
co_return tl::make_unexpected(proto::error::unsupported);
auto session = session_mgr_.create_session(info.user_id);
logger.info("Created session: {}", session);
co_return session;
}
std::unique_ptr<server> parselink::make_server(std::string_view address,

20
source/server/BUILD Normal file
View File

@ -0,0 +1,20 @@
# parselink
cc_library(
name = "server",
srcs = [
"memory_session_manager.cpp",
],
deps = [
"//include/parselink:proto",
"//include/parselink:msgpack",
"//include/parselink:server",
"//source/logging",
"@hydrogen",
"@boost//:asio",
],
visibility = [
# TODO: Fix visibility
"//visibility:public",
],
)

View File

@ -0,0 +1,84 @@
//-----------------------------------------------------------------------------
// ___ __ _ _
// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __
// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ /
// / ___/ (_| | | \__ \ __/ /__| | | | | <
// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ .
//
//-----------------------------------------------------------------------------
// Author: Kurt Sassenrath
// Module: Server
//
// Simple in-memory session manager.
//
// Copyright (c) 2023 Kurt Sassenrath.
//
// License TBD.
//-----------------------------------------------------------------------------
#include <parselink/logging.h>
#include <parselink/server/memory_session_manager.h>
#include <fmt/chrono.h>
using namespace parselink;
using namespace std::chrono_literals;
namespace {
logging::logger logger("memory_session_manager");
constexpr std::size_t max_open_sessions = 1000;
constexpr auto expires_time = 1min;
} // namespace
memory_session_manager::memory_session_manager(boost::asio::io_context& ctx)
: ctx_(ctx)
, strand_(ctx) {
logger.trace("Creating: {}", this);
}
memory_session_manager::~memory_session_manager() {
logger.trace("Destroying: {}", this);
}
tl::expected<proto::session*, proto::error>
memory_session_manager::create_session(std::string_view user_id) {
auto [itr, inserted] = sessions_.try_emplace(std::string{user_id}, user_id);
if (!inserted) {
logger.debug("Session already found for {}, last activity"
" {:%Y-%m-%d %H:%M:%S}",
user_id, fmt::gmtime(itr->second.last_activity()));
if (itr->second.last_activity() + expires_time
< std::chrono::system_clock::now()) {
logger.debug("Session expired. Creating new session");
itr->second = proto::session{user_id};
}
return &itr->second;
}
lookup_by_sid_.emplace(itr->second.id(), &itr->second);
return &itr->second;
}
tl::expected<tl::monostate, proto::error>
memory_session_manager::destroy_session(proto::session* s) {
return tl::make_unexpected(proto::error::unsupported);
}
tl::expected<proto::session*, proto::error> memory_session_manager::find(
std::string_view user_id) {
auto x = sessions_.find(user_id);
if (x != sessions_.end()) {
return &x->second;
}
return tl::make_unexpected(proto::error::unsupported);
}
tl::expected<proto::session*, proto::error> memory_session_manager::find(
proto::session_id const& sid) {
auto x = lookup_by_sid_.find(sid);
if (x != lookup_by_sid_.end()) {
return x->second;
}
return tl::make_unexpected(proto::error::unsupported);
}