WIP Message
This commit is contained in:
parent
f894b8eaf6
commit
9164e7dc60
@ -29,7 +29,8 @@ cc_binary(
|
|||||||
],
|
],
|
||||||
deps = [
|
deps = [
|
||||||
"headers",
|
"headers",
|
||||||
|
"msgpack",
|
||||||
"@boost//:beast",
|
"@boost//:beast",
|
||||||
"//source/common"
|
"//source/common",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|||||||
78
source/include/parselink/message/message.h
Normal file
78
source/include/parselink/message/message.h
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
//-----------------------------------------------------------------------------
|
||||||
|
// ___ __ _ _
|
||||||
|
// / _ \__ _ _ __ ___ ___ / /(_)_ __ | | __
|
||||||
|
// / /_)/ _` | '__/ __|/ _ \/ / | | '_ \| |/ /
|
||||||
|
// / ___/ (_| | | \__ \ __/ /__| | | | | <
|
||||||
|
// \/ \__,_|_| |___/\___\____/_|_| |_|_|\_\ .
|
||||||
|
//
|
||||||
|
//-----------------------------------------------------------------------------
|
||||||
|
// Author: Kurt Sassenrath
|
||||||
|
// Module: Message
|
||||||
|
//
|
||||||
|
// Message types.
|
||||||
|
//
|
||||||
|
// Copyright (c) 2023 Kurt Sassenrath.
|
||||||
|
//
|
||||||
|
// License TBD.
|
||||||
|
//-----------------------------------------------------------------------------
|
||||||
|
#ifndef message_0c61530748b9f966
|
||||||
|
#define message_0c61530748b9f966
|
||||||
|
|
||||||
|
#include <span>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
#include <tl/expected.hpp>
|
||||||
|
|
||||||
|
namespace parselink {
|
||||||
|
namespace message {
|
||||||
|
|
||||||
|
enum class error {
|
||||||
|
bad_magic, // Did not get the message magic expected
|
||||||
|
too_large, // The message size was too large.
|
||||||
|
unknown_type, // The message type is not known
|
||||||
|
};
|
||||||
|
|
||||||
|
// Parselink messages are encoded with MessagePack and take the form of:
|
||||||
|
// | magic | size | content |
|
||||||
|
// - [magic] is the string "prs", in "fixstr" format.
|
||||||
|
// - [size] is the size of the message, not including this header.
|
||||||
|
// - [content] is [size] bytes of MessagePack data, including the
|
||||||
|
// specific type of message presented.
|
||||||
|
|
||||||
|
// This class is responsible for consuming buffer data and yielding a message
|
||||||
|
// instance when complete. Will throw an error if data is incorrect.
|
||||||
|
//
|
||||||
|
class builder {
|
||||||
|
public:
|
||||||
|
// For now, builders don't manage any buffers themselves. Later, that
|
||||||
|
// may change.
|
||||||
|
builder() = default;
|
||||||
|
|
||||||
|
// Reset the builder to its initial state. This means any partially-decoded
|
||||||
|
// message data will be lost.
|
||||||
|
void reset() noexcept;
|
||||||
|
|
||||||
|
// How many bytes are needed to perform a meaningful amount of work.
|
||||||
|
std::size_t bytes_needed() noexcept;
|
||||||
|
|
||||||
|
// Process data from a buffer, building messages. Returns the number of
|
||||||
|
// bytes read from the buffer for the caller's bookkeeping. May yield a
|
||||||
|
// message in addition.
|
||||||
|
std::size_t process(std::span<std::byte const> buffer) noexcept;
|
||||||
|
|
||||||
|
private:
|
||||||
|
enum class state {
|
||||||
|
magic,
|
||||||
|
size,
|
||||||
|
payload
|
||||||
|
};
|
||||||
|
|
||||||
|
state state_{state::magic};
|
||||||
|
std::size_t payload_size_{};
|
||||||
|
std::size_t payload_remaining_{};
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace message
|
||||||
|
} // namespace parselink
|
||||||
|
|
||||||
|
#endif // message_0c61530748b9f966
|
||||||
@ -251,6 +251,7 @@ public:
|
|||||||
// fully parse a token, then incomplete_message is returned.
|
// fully parse a token, then incomplete_message is returned.
|
||||||
constexpr tl::expected<std::span<token>, error> read_some(
|
constexpr tl::expected<std::span<token>, error> read_some(
|
||||||
std::span<token> token_buffer) noexcept;
|
std::span<token> token_buffer) noexcept;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::span<std::byte const> data_;
|
std::span<std::byte const> data_;
|
||||||
decltype(data_)::iterator curr_;
|
decltype(data_)::iterator curr_;
|
||||||
|
|||||||
0
source/message/builder.cpp
Normal file
0
source/message/builder.cpp
Normal file
@ -19,7 +19,7 @@
|
|||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
|
|
||||||
#include <logging.h>
|
#include <logging.h>
|
||||||
#include <server.h>
|
#include <parselink/server.h>
|
||||||
|
|
||||||
#include <boost/asio/io_context.hpp>
|
#include <boost/asio/io_context.hpp>
|
||||||
#include <boost/asio/signal_set.hpp>
|
#include <boost/asio/signal_set.hpp>
|
||||||
@ -91,6 +91,41 @@ namespace {
|
|||||||
constexpr auto no_ex_defer = net::as_tuple(deferred);
|
constexpr auto no_ex_defer = net::as_tuple(deferred);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class user_session : public std::enable_shared_from_this<user_session> {
|
||||||
|
public:
|
||||||
|
user_session(net::ip::tcp::socket sock) : socket_(std::move(sock)) {}
|
||||||
|
~user_session() {
|
||||||
|
logger.debug("Closing connection to {}", socket_.remote_endpoint());
|
||||||
|
}
|
||||||
|
|
||||||
|
void start() {
|
||||||
|
logger.debug("New connection from {}", socket_.remote_endpoint());
|
||||||
|
co_spawn(socket_.get_executor(), [self = shared_from_this()]{
|
||||||
|
return self->reader();
|
||||||
|
}, detached);
|
||||||
|
}
|
||||||
|
|
||||||
|
awaitable<void> reader() {
|
||||||
|
std::array<std::byte, 4096> buffer;
|
||||||
|
auto [ec, n] = co_await socket_.async_read_some(
|
||||||
|
net::buffer(buffer), no_ex_coro);
|
||||||
|
if (ec) {
|
||||||
|
logger.error("Reading from user socket failed: {}", ec);
|
||||||
|
co_return;
|
||||||
|
}
|
||||||
|
logger.info("Read {} bytes from client: {}", n,
|
||||||
|
std::string_view(reinterpret_cast<char*>(buffer.data()), n));
|
||||||
|
}
|
||||||
|
|
||||||
|
enum class state {
|
||||||
|
init,
|
||||||
|
authenticated,
|
||||||
|
active
|
||||||
|
};
|
||||||
|
|
||||||
|
net::ip::tcp::socket socket_;
|
||||||
|
};
|
||||||
|
|
||||||
class monolithic_server : public server {
|
class monolithic_server : public server {
|
||||||
public:
|
public:
|
||||||
monolithic_server(std::string_view address, std::uint16_t user_port,
|
monolithic_server(std::string_view address, std::uint16_t user_port,
|
||||||
@ -100,7 +135,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
awaitable<void> echo(net::ip::tcp::socket socket);
|
|
||||||
awaitable<void> user_listen();
|
awaitable<void> user_listen();
|
||||||
|
|
||||||
net::io_context io_context_;
|
net::io_context io_context_;
|
||||||
@ -120,29 +154,12 @@ monolithic_server::monolithic_server(std::string_view address,
|
|||||||
"websocket_port = {})", address, user_port_, websocket_port_);
|
"websocket_port = {})", address, user_port_, websocket_port_);
|
||||||
}
|
}
|
||||||
|
|
||||||
awaitable<void> monolithic_server::echo(net::ip::tcp::socket socket) {
|
|
||||||
std::array<std::byte, 4096> buffer;
|
|
||||||
while (true) {
|
|
||||||
auto [ec, n] = co_await socket.async_read_some(net::buffer(buffer), no_ex_coro);
|
|
||||||
if (ec) {
|
|
||||||
logger.error("Read from socket failed: {}", ec);
|
|
||||||
co_return;
|
|
||||||
}
|
|
||||||
auto [ec2, x] = co_await net::async_write(socket, net::buffer(buffer, n), no_ex_coro);
|
|
||||||
if (ec2) {
|
|
||||||
logger.error("Write to socket failed: {}", ec);
|
|
||||||
co_return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
awaitable<void> monolithic_server::user_listen() {
|
awaitable<void> monolithic_server::user_listen() {
|
||||||
auto exec = co_await net::this_coro::executor;
|
auto exec = co_await net::this_coro::executor;
|
||||||
net::ip::tcp::acceptor acceptor{exec, {addr_, user_port_}};
|
net::ip::tcp::acceptor acceptor{exec, {addr_, user_port_}};
|
||||||
while (true) {
|
while (true) {
|
||||||
auto socket = co_await acceptor.async_accept(use_awaitable);
|
std::make_shared<user_session>(
|
||||||
logger.debug("Accepted new connection from {}", socket.remote_endpoint());
|
co_await acceptor.async_accept(use_awaitable))->start();
|
||||||
co_spawn(exec, echo(std::move(socket)), detached);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user