Skip to content
Merged

Dev #88

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Added

- Operations such as ``-=`` and ``++`` for store variables in C++.
- YAML export of store meta-data.
- ``stored::MuxLayer`` to multiplex multiple protocol layers over a single connection.

Changed
```````
Expand All @@ -34,6 +35,11 @@ Changed
on ``keepAlive()``.
- Improve reconnection behavior on protocol layers.

Fixed
`````

- Init value of CRC32 in Python ``libstored.protocol.Crc32Layer``.

.. _Unreleased: https://github.com/DEMCON/libstored/compare/v2.0.0...HEAD


Expand Down
81 changes: 72 additions & 9 deletions examples/lossy_sync/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,27 @@
#include <getopt_mini.h>

enum {
// Interval in between polling of the sockets.
PollInterval_ms = 100,

// Interval to check if the Synchronizers need to send out updates to the other party.
SyncInterval_ms = PollInterval_ms * 5,
IdleTimeout_ms = SyncInterval_ms,
DisconnectTimeout_ms = IdleTimeout_ms * 10,

// Interval to retransmit unacknowledged packets over the lossy line.
// Do this more often than SyncInterval_ms to avoid delays.
RetransmitInterval_ms = PollInterval_ms * 3,

// Timeout value to send out a keep-alive packet.
// In this application, this should not be needed, as synchronization is faster.
IdleTimeout_ms = SyncInterval_ms * 2,

// Timeout until we give up on the connection.
DisconnectTimeout_ms = IdleTimeout_ms * 5,

// Update the heartbeat value in the store.
HeartbeatInterval_ms = 1000,

// Delay before trying to reconnect after a disconnection.
ReconnectDelay_ms = DisconnectTimeout_ms + IdleTimeout_ms * 2,
};

Expand Down Expand Up @@ -283,6 +299,13 @@ class SyncStack {
if(verbose)
wrap<stored::PrintLayer>(stdout, "sync");

// Add another channel to send pings.
auto mux = wrap<stored::MuxLayer>();
auto ch1_print = alloc<stored::PrintLayer>(stdout, "chan");
m_ch1 = alloc<stored::ProtocolLayer>();
m_ch1->wrap(*ch1_print);
mux.get()->map(1, *m_ch1);

// We don't want to do ARQ on large messages, so we segment them to some
// appropriate size.
wrap<stored::SegmentationLayer>(32U);
Expand Down Expand Up @@ -319,24 +342,25 @@ class SyncStack {
wrap<stored::PrintLayer>(stdout, "raw");

// Connect to I/O.
m_zmqLayer.wrap(*m_layers.back());
m_zmqLayer.wrap(*m_stack.back());

// Register the store...
m_synchronizer.map(store);
// ...and the protocol stack.
m_synchronizer.connect(**m_layers.begin());
m_synchronizer.connect(**m_stack.begin());

// There we go!
auto now = std::chrono::steady_clock::now();
m_idleUpSince = now;
m_idleDownSince = now;
m_lastRetransmit = now;
m_lastSync = now;
m_lastHeartbeat = now;
m_heartbeat = server ? store.server_heartbeat.variable()
: store.client_heartbeat.variable();

if(!server) {
m_synchronizer.syncFrom(store, *m_layers.front());
m_synchronizer.syncFrom(store, *m_stack.front());
m_connected = true;
m_arq->keepAlive();
}
Expand All @@ -354,6 +378,7 @@ class SyncStack {
recv();
doSync(now);
checkRetransmit(now);
checkIdle(now);
checkDisconnect(now);
doHeartbeat(now);

Expand All @@ -366,16 +391,25 @@ class SyncStack {
}

protected:
template <typename T, typename... Args>
std::shared_ptr<T> alloc(Args&&... args)
{
auto* p = new T{std::forward<Args>(args)...};
std::shared_ptr<T> layer{p};
m_layers.emplace_back(layer);
return layer;
}

template <typename T, typename... Args>
std::shared_ptr<T> wrap(Args&&... args)
{
auto* p = new T{std::forward<Args>(args)...};
std::shared_ptr<T> layer{p};

if(!m_layers.empty())
layer->wrap(*m_layers.back());
if(!m_stack.empty())
layer->wrap(*m_stack.back());

m_layers.emplace_back(layer);
m_stack.emplace_back(layer);
return layer;
}

Expand Down Expand Up @@ -404,6 +438,26 @@ class SyncStack {

void checkRetransmit(std::chrono::time_point<std::chrono::steady_clock> const& now)
{
// Check if we need to retransmit messages that have not been acked yet.

if(!connected())
return;

if(m_idle->idleDown()) {
auto dt = now - m_lastRetransmit;
if(dt > std::chrono::milliseconds(RetransmitInterval_ms)) {
m_arq->process();
m_lastRetransmit = now;
}
} else {
m_lastRetransmit = now;
}
}

void checkIdle(std::chrono::time_point<std::chrono::steady_clock> const& now)
{
// Check if we need to send out a keep-alive message once in a while.

if(!connected())
return;

Expand Down Expand Up @@ -442,7 +496,13 @@ class SyncStack {
auto dt = now - m_lastHeartbeat;
if(dt >= std::chrono::milliseconds(HeartbeatInterval_ms)) {
m_lastHeartbeat = now;
m_heartbeat++;
auto h = m_heartbeat++;

if(connected()) {
char buf[32];
snprintf(buf, sizeof(buf), "ping %u", h);
m_ch1->encode(buf, strlen(buf), true);
}
}
}

Expand All @@ -467,11 +527,14 @@ class SyncStack {
stored::Synchronizer m_synchronizer;
std::shared_ptr<stored::ArqLayer> m_arq;
std::shared_ptr<stored::IdleCheckLayer> m_idle;
std::shared_ptr<stored::ProtocolLayer> m_ch1;
std::list<std::shared_ptr<stored::ProtocolLayer>> m_layers;
std::list<std::shared_ptr<stored::ProtocolLayer>> m_stack;
stored::ZmqLayer m_zmqLayer;
stored::PollableZmqSocket m_pollable{m_zmqLayer.socket(), stored::Pollable::PollIn};
std::chrono::time_point<std::chrono::steady_clock> m_idleUpSince;
std::chrono::time_point<std::chrono::steady_clock> m_idleDownSince;
std::chrono::time_point<std::chrono::steady_clock> m_lastRetransmit;
std::chrono::time_point<std::chrono::steady_clock> m_lastSync;
std::chrono::time_point<std::chrono::steady_clock> m_lastHeartbeat;
stored::Variable<uint32_t, ExampleSync> m_heartbeat;
Expand Down
105 changes: 105 additions & 0 deletions include/libstored/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ class ArqLayer : public ProtocolLayer {
virtual void disconnected() override;
bool isConnected() const;
void keepAlive();
int process();

enum Event {
/*!
Expand Down Expand Up @@ -1361,6 +1362,110 @@ make_callback(Up&& up, Down&& down, Connected&& connected, Disconnected&& discon
}
# endif // C++11

/*!
* \brief A that multiplexes between protocol stacks.
*
* Channel data is prefixed by a one-byte channel identifier. These channels can be split by the
* receiver to separate stacks. Channel ID 0 is used for the stack above the MuxLayer itself.
*
* One can use this layer to multiplex different protocols or logging output over the same physical
* channel. In case a lossless channel is used, the following stack could be used:
*
* channel 0:
* - Debugger
* - SegmentationLayer
* - AsciiEscapeLayer
* - TerminalLayer
*
* channel 1:
* - PrintLayer
*
* - MuxLayer
* - some lossless transport layer
*
* In this case, the MuxLayer passes through a stream of bytes, so framing is required in the
* channel 0 stack. The bytes of channel 1 are just printed for logging.
*
* When having a lossy channel, add an ArqLayer below the MuxLayer.
*
* channel 0
* - Debugger
*
* channel 1
* - PrintLayer
*
* - MuxLayer
* - SegmentationLayer
* - Crc32Layer
* - ArqLayer
* - AsciiEscapeLayer
* - TerminalLayer
* - some lossy transport layer
*
* Now, the framing is done below the ArqLayer, so all \c decode()s get full frames above the
* MuxLayer. Therefore, no framing is required in channel 0.
*/
class MuxLayer : public ProtocolLayer {
STORED_CLASS_NOCOPY(MuxLayer)
public:
typedef ProtocolLayer base;
typedef uint8_t ChannelId;

static char const Esc = '\x10'; // DLE
static char const Repeat = '\x15'; // NAK

explicit MuxLayer(ProtocolLayer* up = nullptr, ProtocolLayer* down = nullptr);
virtual ~MuxLayer() override;

# if STORED_cplusplus >= 201103L
MuxLayer(std::initializer_list<std::reference_wrapper<ProtocolLayer>> layers);
void map(std::initializer_list<std::reference_wrapper<ProtocolLayer>> layers);
# endif // C++11

void map(ChannelId channel, ProtocolLayer& layer);
void unmap(ChannelId channel);
void unmap();

virtual void decode(void* buffer, size_t len) override;
virtual void encode(void const* buffer, size_t len, bool last = true) override;
# ifndef DOXYGEN
using base::encode;
# endif
virtual void reset() override;
virtual void connected() override;
virtual void disconnected() override;
virtual size_t mtu() const override;

protected:
void decode_(void* buffer, size_t len);
void encode_(ChannelId channel, void const* buffer, size_t len, bool last = true);

class Channel final : public ProtocolLayer {
STORED_CLASS_NOCOPY(Channel)
public:
typedef ProtocolLayer base;
Channel(MuxLayer& mux, ChannelId channel, ProtocolLayer& up);
virtual ~Channel() override is_default

virtual void encode(void const* buffer, size_t len, bool last = true) override;
virtual size_t mtu() const override;

private:
MuxLayer* m_mux;
ChannelId m_channel;
};

private:
ssize_t channelIndex(ChannelId id) const;
ProtocolLayer* channel(ChannelId id);

private:
Vector<Channel*>::type m_channels;
ChannelId m_encodingChannel;
ProtocolLayer* m_decodingChannel;
bool m_decodingEsc;
};

namespace impl {
class Loopback1 final : public ProtocolLayer {
STORED_CLASS_NOCOPY(Loopback1)
Expand Down
2 changes: 1 addition & 1 deletion python/libstored/protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ class Crc32Layer(ProtocolLayer):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._crc = crcmod.mkCrcFun(0x104c11db7, 0xffffffff, True, 0xffffffff)
self._crc = crcmod.mkCrcFun(0x104c11db7, 0, True, 0xffffffff)

async def encode(self, data: ProtocolLayer.Packet) -> None:
if isinstance(data, str):
Expand Down
6 changes: 6 additions & 0 deletions sphinx/doc/cpp_protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ The inheritance of the layers is shown below.
ProtocolLayer <|-- PrintLayer
ProtocolLayer <|-- IdleCheckLayer
ProtocolLayer <|-- CallbackLayer
ProtocolLayer <|-- MuxLayer

abstract ArqLayer
SegmentationLayer -[hidden]--> ArqLayer
Expand Down Expand Up @@ -208,6 +209,11 @@ stored::Loopback

.. doxygenclass:: stored::Loopback

stored::MuxLayer
----------------

.. doxygenclass:: stored::MuxLayer

stored::NamedPipeLayer
----------------------

Expand Down
Loading
Loading