WIP: Add hydrogen, session_id

This commit is contained in:
Kurt Sassenrath 2023-11-06 07:06:19 -08:00
parent 8f8066c243
commit 1ceccd0720
6 changed files with 233 additions and 63 deletions

View File

@ -1,6 +1,7 @@
workspace(name = "parselink")
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")
#===============================================================================
# Imported Bazel modules
@ -76,6 +77,29 @@ cc_library(
"""
)
#-------------------------------------------------------------------------------
# libhydrogen: A lightweight cryptography library
#-------------------------------------------------------------------------------
hydrogen_commit = "c382cbb"
hydrogen_base_url = \
"https://github.com/jedisct1/libhydrogen"
hydrogen_branch = "master"
git_repository(
name = "hydrogen",
remote = hydrogen_base_url,
commit = hydrogen_commit,
build_file_content =
"""
cc_library(
name = "hydrogen",
srcs = glob(["hydrogen.c", "impl/**/*.h"]),
hdrs = ["hydrogen.h"],
visibility = ["//visibility:public"],
)
""",
)
#-------------------------------------------------------------------------------
# ut: Unit test framework.
# TODO(kss): Only if tests are needed?

View File

@ -19,12 +19,28 @@
#define session_07eae057feface79
#include "parselink/msgpack/token.h"
#include <chrono>
#include <cstdint>
#include <functional>
#include <span>
#include <string>
#include <tl/expected.hpp>
template <>
struct std::hash<std::span<std::byte const>> {
constexpr static std::uint32_t seed_var = 0x811c9dc5;
constexpr static std::uint32_t factor = 0x01000193;
constexpr auto operator()(std::span<std::byte const> data) const noexcept {
std::uint32_t digest = seed_var * factor;
for (auto byte : data) {
digest = (digest ^ static_cast<std::uint32_t>(byte)) * factor;
}
return digest >> 8;
}
};
namespace parselink {
namespace proto {
@ -40,6 +56,8 @@ enum class error {
struct header_info {
std::uint32_t message_size; // Size of the message, minus the header.
std::uint32_t bytes_read; // How many bytes of the buffer were used.
std::uint32_t bytes_parsed; // How many bytes were parsed as part of the
// header.
};
struct connect_info {
@ -48,6 +66,54 @@ struct connect_info {
std::span<std::byte const> session_id;
};
template <typename T>
struct transparent_hash {
using is_transparent = void;
using type = T;
using hash_type = std::hash<type>;
template <typename Arg>
[[nodiscard]] constexpr auto operator()(Arg&& arg) const {
return hash_type{}(std::forward<Arg>(arg));
}
};
class session {
public:
using close_handle = std::function<void(std::string_view)>;
session(std::string_view user_id, close_handle hdl) noexcept;
~session();
std::span<std::byte const> id() const noexcept { return id_; }
std::string_view user_id() const noexcept { return user_id_; }
auto last_activity() const noexcept { return last_activity_; }
void update_last_activity(
std::chrono::steady_clock::time_point =
std::chrono::steady_clock::now()) noexcept {}
constexpr bool operator==(std::span<std::byte const> id) const noexcept {
return std::ranges::equal(id, id_);
}
private:
std::array<std::byte, 32> id_;
std::string user_id_;
close_handle closer_;
std::chrono::steady_clock::time_point last_activity_;
};
template <>
struct transparent_hash<session>
: transparent_hash<std::span<std::byte const>> {
[[nodiscard]] auto operator()(session const& s) const {
return std::hash<std::span<std::byte const>>{}(s.id());
}
};
// Parse the protocol header out of a buffer.
tl::expected<header_info, error> parse_header(
std::span<std::byte const> buffer) noexcept;

View File

@ -0,0 +1,49 @@
//-----------------------------------------------------------------------------
// ___ __ _ _
// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __
// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ /
// / ___/ (_| | | \__ \ __/ /__| | | | | <
// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ .
//
//-----------------------------------------------------------------------------
// Author: Kurt Sassenrath
// Module: proto
//
// Session ID. Used as a handle to access an existing user session, which can
// also be used to share parse data without any linking of users.
//
// Copyright (c) 2023 Kurt Sassenrath.
//
// License TBD.
//-----------------------------------------------------------------------------
#ifndef session_id_6598f9bae1cbb501
#define session_id_6598f9bae1cbb501
#include <array>
#include <cstddef>
#include <span>
namespace parselink {
namespace proto {
struct session_id {
std::array<std::byte, 32> bytes;
session_id() noexcept;
[[nodiscard]] constexpr auto operator<=>(
session_id const& other) const noexcept {
return bytes <=> other.bytes;
}
[[nodiscard]] constexpr auto operator<=>(
std::span<std::byte const> other) const noexcept {
return std::lexicographical_compare_three_way(bytes.begin(),
bytes.end(), other.begin(), other.end(),
std::compare_three_way());
}
};
} // namespace proto
} // namespace parselink
#endif // session_id_6598f9bae1cbb501

View File

@ -9,6 +9,7 @@ cc_library(
"//include/parselink:proto",
"//include/parselink:msgpack",
"//source/logging",
"@hydrogen",
],
visibility = [
# TODO: Fix visibility

View File

@ -19,11 +19,28 @@
//-----------------------------------------------------------------------------
#include "parselink/proto/session.h"
#include "hydrogen.h"
#include "parselink/logging.h"
#include "parselink/msgpack/token.h"
#include <fmt/ranges.h>
namespace {
void ensure_initialized() {
static auto x [[maybe_unused]] = [] { return hydro_init(); }();
}
template <std::size_t N>
auto get_random_bytes() {
ensure_initialized();
std::array<std::byte, N> out;
hydro_random_buf(out.data(), N);
return out;
}
} // anonymous namespace
using namespace parselink;
using namespace parselink::proto;
@ -169,3 +186,16 @@ tl::expected<connect_info, error> proto::parse_connect(
return tl::make_unexpected(error::bad_data);
}
}
session::session(std::string_view user_id, close_handle hdl) noexcept
: id_(get_random_bytes<32>())
, user_id_(std::string{user_id})
, closer_(std::move(hdl))
, last_activity_(std::chrono::steady_clock::now()) {
logger.debug("New session with id {} created for {} {:x}", id_, user_id_,
transparent_hash<session>{}(*this));
}
session::~session() {
if (closer_) closer_("Destroyed");
}

View File

@ -36,7 +36,7 @@
#include <boost/asio/signal_set.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/write.hpp>
#include <map>
#include <unordered_set>
#include <fmt/ranges.h>
@ -154,13 +154,6 @@ constexpr auto no_ex_defer = net::as_tuple(deferred);
class user_connection;
struct user_session {
std::weak_ptr<user_connection> conn;
std::string user_id;
std::array<std::byte, 32> session_id;
std::chrono::system_clock::time_point expires_at;
};
class monolithic_server : public server {
public:
monolithic_server(std::string_view address, std::uint16_t user_port,
@ -168,16 +161,16 @@ public:
std::error_code run() noexcept override;
net::awaitable<tl::expected<tl::monostate, proto::error>> create_session(
std::weak_ptr<user_connection> conn,
net::awaitable<tl::expected<proto::session*, proto::error>> create_session(
std::shared_ptr<user_connection> const& conn,
proto::connect_info const& info);
private:
friend user_session;
awaitable<void> user_listen();
std::map<std::string, user_session, std::less<>> active_sessions_;
std::unordered_set<proto::session, proto::transparent_hash<proto::session>,
std::equal_to<>>
sessions_;
net::io_context io_context_;
net::io_context::strand session_strand_;
@ -192,7 +185,9 @@ public:
: server_(server)
, socket_(std::move(sock)) {}
~user_connection() {
~user_connection() { stop(); }
void stop() {
logger.debug("Connection to {} closed.", socket_.remote_endpoint());
boost::system::error_code ec;
socket_.shutdown(net::ip::tcp::socket::shutdown_both, ec);
@ -207,81 +202,85 @@ public:
detached);
}
awaitable<tl::expected<std::monostate, boost::system::error_code>>
buffer_message(std::span<std::byte> buffer) noexcept {
awaitable<tl::expected<tl::monostate, proto::error>> read_message(
std::vector<std::byte>& buffer) noexcept {
// Use a small buffer on the stack to read the initial header.
std::array<std::byte, 8> hdrbuff;
auto [ec, n] = co_await socket_.async_read_some(
net::buffer(hdrbuff), no_ex_coro);
if (ec) {
logger.error("Reading hdr from user socket failed: {}", ec);
co_return tl::make_unexpected(proto::error::system_error);
}
logger.debug("Read {} bytes from client: {}", n,
std::span(hdrbuff.data(), n));
auto maybe_hdr = proto::parse_header(std::span(hdrbuff.data(), n));
if (!maybe_hdr) {
logger.error("Unable to parse header: {}", maybe_hdr.error());
co_return tl::make_unexpected(maybe_hdr.error());
}
// TODO(ksassenrath): Replace with specialized allocator.
buffer.resize(maybe_hdr->message_size);
// Copy remaining portion of message in initial read to the message
// buffer.
std::copy(std::next(hdrbuff.begin(), maybe_hdr->bytes_read),
std::next(hdrbuff.begin(), n), buffer.begin());
auto span = std::span(
buffer.begin() + n - maybe_hdr->bytes_read, buffer.end());
// Buffer remaining message.
std::size_t amt = 0;
while (amt < buffer.size()) {
auto subsp = buffer.subspan(amt);
while (amt < span.size()) {
auto subsp = span.subspan(amt);
auto [ec, n] = co_await socket_.async_read_some(
net::buffer(subsp), no_ex_coro);
logger.debug("Read {} bytes, total is now {}", n, amt + n);
if (ec || n == 0) {
logger.error("Reading from user socket failed: {}", ec);
co_return tl::make_unexpected(ec);
logger.error("Reading msg from user socket failed: {}", ec);
co_return tl::make_unexpected(proto::error::system_error);
}
amt += n;
}
co_return std::monostate{};
co_return tl::monostate{};
}
awaitable<void> await_connect() noexcept {
// Use a small buffer on the stack to read the initial header.
std::array<std::byte, 8> buffer;
auto [ec, n] = co_await socket_.async_read_some(
net::buffer(buffer), no_ex_coro);
std::vector<std::byte> msgbuf;
if (ec) {
logger.error("Reading from user socket failed: {}", ec);
if (auto maybe_msg = co_await read_message(msgbuf); !maybe_msg) {
logger.debug("returning");
co_return;
}
logger.debug("Read {} bytes from client: {}", n,
std::span(buffer.data(), n));
auto maybe_hdr = proto::parse_header(std::span(buffer.data(), n));
if (!maybe_hdr) {
logger.error("Unable to parse header: {}", maybe_hdr.error());
co_return;
}
// TODO(ksassenrath): Replace with specialized allocator.
auto msg = std::vector<std::byte>(maybe_hdr->message_size);
// Copy remaining portion of message in initial read to the message
// buffer.
std::copy(std::next(buffer.begin(), maybe_hdr->bytes_read),
std::next(buffer.begin(), n), msg.begin());
auto msg_span =
std::span(msg.begin() + n - maybe_hdr->bytes_read, msg.end());
if (auto result = co_await buffer_message(msg_span); !result) {
logger.error("Unable to parse header: {}", result.error());
co_return;
}
auto reader = msgpack::token_reader(msg);
auto reader = msgpack::token_reader(msgbuf);
std::array<msgpack::token, 32> tokens;
auto session =
auto connect_info =
reader.read_some(tokens)
.map_error([](auto) { return proto::error::bad_data; })
.and_then(proto::parse_connect);
if (!session) {
logger.error("Session failed: {}", session.error());
if (!connect_info) {
logger.error("Session failed: {}", connect_info.error());
co_return;
}
co_await server_.create_session(
std::weak_ptr<user_connection>(shared_from_this()), *session);
co_await server_.create_session(shared_from_this(), *connect_info);
}
enum class state { init, authenticated, active };
monolithic_server& server_;
net::ip::tcp::socket socket_;
proto::session* session_;
};
monolithic_server::monolithic_server(std::string_view address,
@ -322,15 +321,16 @@ std::error_code monolithic_server::run() noexcept {
return {};
}
net::awaitable<tl::expected<tl::monostate, proto::error>>
monolithic_server::create_session(std::weak_ptr<user_connection> conn,
proto::connect_info const& session) {
net::awaitable<tl::expected<proto::session*, proto::error>>
monolithic_server::create_session(std::shared_ptr<user_connection> const& conn,
proto::connect_info const& info) {
// Move to session strand.
co_await net::post(session_strand_,
net::bind_executor(session_strand_, use_awaitable));
// Pretend that there's no open
proto::session session(info.user_id, {});
// Now, update the map of sessions
co_return tl::make_unexpected(proto::error::unsupported);
}