DEV Community

cppchedy
cppchedy

Posted on • Edited on

Rolling your own MOM, or how I did it - Messaging Model

Introduction

While part III of MOM series was mostly about understanding Seastar and putting the first milestone, the third part is dedicated to the Messaging Model of the System: we will present its elements and we will implement them in core module.

This article has both theory and code, so I hope you enjoy it and take it as inspiration for your models. Let's start.

Messaging Model

Some of you may wonder why we need a messaging model? I prefer to address this question before moving to the discussion on the model.

To answer this question properly, we need to consider the context of our system. If you recall, I said that we are building a software serving as a message relayer/broker for enterprise applications and depending on the situation at hand, enterprise devs need to come with a design that connects all software by using our middleware. However, I think the easier the learning curve of a system is the more it get adopted. As an example, look at c++, many complain that it's too large and difficult to learn. This is a drawback and a barrier for its adoption, even though the current number of its users doesn't reflect that.
I think that giving a small and clear mental model capturing all the details needed in the process of design and also other details of the internals of our system help to solve the problem of learning. Moreover, having a model for our system makes it easy to reason on how to solve the problem of message exchange and also is a key tool for expressing the intent of the architecture of the solution. Besides, the diagram showing the solution(using the element of the model) deployed in enterprise system serves as documentation for (new)team members.

Elements of the Messaging model

I prefer to follow an analyses approach instead of enumerating the elements with a definition. I think it's more beneficial so Let's identify user needs and extract from them concepts for the model.

Our messaging model is composed of elements that can be combined to form an architecture of message routes between its components(apps).

It's obvious that our model needs to include a concept/entity that represents enterprise application within the architecture. Thus, we introduce Client, the first element of our model.

Client lives outside the middleware and it's primarily an entity that interacts with the system. The server has a fixed and known address, so it can be accessed by any application. However, to route a message to its destination(s), the server needs the address of its client(s), therefore, the client needs to register itself in the system. This way the server gets the information needed for doing its task. Enter Context, the second element of our model that holds Client information and other related data. In contrast to Client, Context lives within the middleware and is essential for communication with the remote application.

Users of our system may need to send the same message from one to multiple application, or that some applications may be semantically related, therefore it's natural to tie them together. To enable such behavior, we add Group entity to our model. This element can be used to cluster Contexts according to user will.

Group is a central concept to our messaging model. It doesn't only relate Contexts to each other but also it defines message delivery(who exactly gets the message). We will talk more about the delivery process later on in this article.

We have been using Context for Group but let's use the right terminology: when we speak about Group, I prefer to call the contexts related to it Members.

Member can play the role of a consumer or a producer of messages. Each Group has one producer and multiple consumers. This design decision was taken to simplify the work I planned to do with the end2end ack(acknowledgment) feature (but didn't get to implement it). It's purely to respect deadlines so feel free to change this and redesign the system.

Contexts became members(with producer Xor consumer membership) of a Group when an application requests a joining(or subscribing) operation. That's when we Associate the two.

Message distribution policies

As we mentioned earlier, Groups define the process of message delivery. I planned to support 3 politics of message distribution:

  • Single-delivery: In this mode, the message is sent only to one consumer.

  • Per-match: In this policy, we are sending the message only if a certain "condition" is met. for each association with a consumer, we check if it respects the criteria, if so we send the message.

  • Publisher/Subscriber: You may already have guessed from the name, here, the message gets delivered to every consumer of the Group. Everyone is concerned with messages coming from the producer of the Group.

So We have three types of groups, with each type operating in one of the politics explained above.

Now that we have seen policies(and consequently defined the types of Group our system can have), we can move on to a related concept used in the Per-match mode: Annotation.

Recall that per-match groups do not deliver messages to a consumer unless they meet a criteria. In our model, we express this by Annotation placed on each Association. If the text from the message matches the Annotation, the system sends the message to the consumer.

Let's end this section with an example of a system architecture where we use the element of our model to describe the setup needed for message exchange.

This image shows a scenario where we send log messages to different destinations over the network depending on the severity level of the log(info, error, etc...). As depicted in the figure, we have a producer of logs and consumers, all of them are members of "MyGr" Group. From the picture, you see that each association is annotated with a severity level, and with it, the system can send the message to its appropriate destination.

TL;DR

  • Group: An entity that clusters together semantically related applications.

  • Client: An entity representing an enterprise application.

  • Context: An entity representing application inside the middleware.

  • Association: This entity represents the act of joining a Group.

  • Annotation: It's an entity representing a condition and it's attached to an Association.

I hope I was successful in explaining the messaging model of the system. Please feel free to raise questions in the comments if you find something unclear.

Implementation

Now that we discussed the messaging model and learned it's elements, we can go to talk about how we are going to implement it.

In this section, we will explore two modules: Identity module and Core module. The latter depends on the first one. We will see how.

Identity module

The notion of identity is so fundamental that you see it almost everywhere and our system is no exception. Specifically, we are using it in our model even though we didn't mention that explicitly. As we will see in the next section of this article, two elements from the model rely on the Identity class of the Identity Module.

Let's now focus on writing the module. We start with Identity class in the file src/identity/identity.hh:

#pragma once

#include <string>

// we will need these for upcomming functions
#include <boost/functional/hash.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>

template <typename T>
class Identity {
  T id_;

public:
  using value_type = T;

  Identity() = default;
  Identity(const Identity &) = default;
  Identity(Identity &&) = default;
  ~Identity() = default;

  Identity &operator=(const Identity &) = default;
  Identity &operator=(Identity &&) = default;

  Identity(T id) : id_{id} {
  }

  friend bool operator==(const Identity &lhs, const Identity &rhs) {
    return lhs.id_ == rhs.id_;
  }

  friend bool operator!=(const Identity &lhs, const Identity &rhs) {
    return !(lhs == rhs);
  }

  std::string toString() const {
    return idToString(id_);
  }

  const value_type &value() const {
    return id_;
  }
};
Enter fullscreen mode Exit fullscreen mode

as you have seen it's a simple value wrapper, I think it doesn't need explanation but we have to write idToString so let's add the header before class Identity and implement it in src/identity/identity.cpp:

// src/identity/identity.hh
#pragma once
//...

std::string idToString(boost::uuids::uuid uuid_);

std::string idToString(std::string str);

template <typename T>
class Identity {
//...
};

// src/identity/identity.cpp



Enter fullscreen mode Exit fullscreen mode
#include "identity.hh"

std::string idToString(boost::uuids::uuid uuid_) {
  return boost::uuids::to_string(uuid_);
}

std::string idToString(std::string str) {
  return str;
}
Enter fullscreen mode Exit fullscreen mode

Here we are using boost::uuids and the function that convert uuid to std:: string. Seastar depends on boost, so we are using it from there, you don't need to add anything to conan.

Spoiler, we will be using uuids as a template argument for Identity class for Clients and std::string for Groups in Core modules. we will look at it in more details.

Next we add an utility function for generating identity from chars:

//src/identity/identity.hpp
Identity<boost::uuids::uuid> makeIdentity(const char *id,
                                          boost::uuids::string_generator &gen) {
  return gen(std::string{id});
}
Enter fullscreen mode Exit fullscreen mode

Don't forget to add the header of this method to identity.hh.

We need to give the user of this module to generate uuids so we are implementing two functions one that generate an Identity<bst::uuid> and another for generating many at once. We will put these into other files, src/identity/generate_identity.hh and src/identity/generate_identity.cpp:

//src/identity/generate_identity.hh
#pragma once

#include <boost/uuid/uuid.hpp>            // uuid class
#include <boost/uuid/uuid_generators.hpp> // generators
#include <boost/uuid/uuid_io.hpp>

#include "identity.hh"

Identity<boost::uuids::uuid> generateIdentity();

std::vector<Identity<boost::uuids::uuid>> generateRange(int nb);

//src/identity/generate_identity.cpp
#include "identity_generator.hh"

Identity<boost::uuids::uuid> generateIdentity() {
  static boost::uuids::random_generator generator{};
  boost::uuids::uuid uuid_ = generator();
  return Identity<boost::uuids::uuid>{uuid_};
}

std::vector<Identity<boost::uuids::uuid>> generateRange(int nb) {
  std::vector<Identity<boost::uuids::uuid>> uuids;
  uuids.reserve(nb);

  while (--nb) {
    uuids.push_back(generateIdentity());
  }

  return uuids;
}
Enter fullscreen mode Exit fullscreen mode

With this cleared out, we are (almost)done with Identity module. we will now move to the Core module where we get to implement our messaging model.

Core module

Before seeing the code, I would like to share the line of thoughts on how I designed this module through a Question-driven approach(at least at the beginning).

We start with this general question: What's the intent of the module? what would it do?. In simple terms, The core module manages Entities of the model.

what do we mean exactly by manages and what are the entities we are referring to?. So manage here is perform the following operations on Clients and Groups(entities): Add, join, leave, register, unregister, delete...

So we have seen the use cases of the module, now let's think about the "API", what exactly are we going to expose and how are we going to design?

For start, we are already using Connection class in server_tcp which belongs to this module, so we know that it's going to be public.

Also, I suggest introducing a container-like class for our entities that exposes the operations discussed above. This class has the role of controlling our entities lifetime and relations. Topology is the name of this class.

To sum-up, the core module exposes two classes: Connection and Topology. Let's look at Topology declaration:

class Topology {
  std::unordered_map<Identity<boost::uuids::uuid>, Client> clients;
  std::unordered_map<Identity<std::string>, Group> groups;

public:
  Topology() = default;
  Topology(const Topology &) = default;
  Topology(Topology &) = default;
  Topology &operator=(Topology &) = default;
  ~Topology() = default;

  bool addGroup(Identity<std::string> id, const std::string &grpType);
  bool removeGroup(Identity<std::string> id);

  bool addClient(Identity<boost::uuids::uuid> id,
                 socket_address sock,
                 uint16_t portRemoteConn);

  std::string updateClient(Identity<boost::uuids::uuid> id,
                           socket_address sock,
                           uint16_t port);

  std::string registerClientAs(Identity<std::string> idGrp,
                               Identity<boost::uuids::uuid> idClt,
                               const std::string &membershipType,
                               const std::string &ann);
  std::string unregisterClientFrom(Identity<std::string> idGrp,
                                   Identity<boost::uuids::uuid> idClt,
                                   const std::string &membershipType);

  std::string pushMessageTo(Identity<std::string> idGrp,
                            Identity<boost::uuids::uuid> idClt,
                            const std::string &msg);

  size_t groupsSize() const;
  size_t clientsSize() const;

  bool clientExists(Identity<boost::uuids::uuid> id) const;
  bool groupExists(Identity<std::string> id) const;
};

Enter fullscreen mode Exit fullscreen mode

I prefer to leave the definition of the methods of this class to the next part because several of them involve details of the MOZA protocol. I think it's better to show the implementation in the following article.

Let's now focus on implementing the messaging model elements that we will be controlling through Topology class. Client class is the first one we are going to discuss. This class holds the state of the external apps needed for message exchange. To be more precise, This is the Context element of our messaging model. The name I choose may appear confusing because we have a Client concept, so why are we mapping Context to Client class? I added the Client concept only for convenience, so we don't need it in the implementation. Here is the code for Client:

//src/lib/core/client.hh
#include <algorithm>
#include <iostream>
#include <map>
#include <string>

#include <boost/uuid/uuid.hpp>

#include <seastar/core/future-util.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/net/api.hh>
#include <seastar/net/inet_address.hh>

#include "connection.hh"
#include "lib/identity/identity.hh"

using namespace seastar;
using namespace net;

class Client {
  Identity<boost::uuids::uuid> m_id;
  socket_address m_remoteAddress;
  uint16_t m_remotePort;
  std::map<std::string, seastar::queue<std::string>> m_channels;
  bool m_connected;

  //...

public:
  Client() = default;
  Client(const Client &) = delete;
  Client(Client &&) = default;
  ~Client() = default;

  Client &operator=(const Client &) = delete;
  Client &operator=(Client &&) = default;

  Client(Identity<boost::uuids::uuid> id, socket_address addr, uint16_t rPort)
    : m_id{id}, m_remoteAddress{addr}, m_remotePort{rPort}, m_connected{false} {
  }

  uint16_t getClientPort() const {
    return m_remotePort;
  }

  void setClientPort(uint16_t prt) {
    m_remotePort = prt;
  }

  socket_address getClientAddress() const {
    return m_remoteAddress;
  }

  void setClientAddress(socket_address addr) {
    m_remoteAddress = addr;
  }

  const Identity<boost::uuids::uuid> &identity() const {
    return m_id;
  }

  std::string addChannel(const std::string &groupNme) {
    this->m_channels.try_emplace(groupNme, 128);
    return groupNme;
  }

  bool push(const std::string &grpNme, std::string msg) {
    auto &qu = this->m_channels.at(grpNme);
    return qu.push(std::move(msg));
  }

  void startDelivery() {
    //...
  }
};
Enter fullscreen mode Exit fullscreen mode

As you can see, each Client instance is uniquely identified with a uuid stored as one of its attributes. Moreover, we store its remote address and port. This is needed when delivering messages. In addition, when a client joins a group as a consumer, it adds a dedicated channel(here a seastar::queue) and uses the name of the group as a key in the std::map.

Note that I omitted parts of the code( start delivery, etc..) because it implements one of the aspects of the protocol discussed in the next article.

After we have shown Client class, let's pass to the implementation of the Group concept from our model.

Group is a central element in our model. it is constructed from many other elements. I follow a top-down approach in the implementation of this concept. That means we will start with a top-level overview of the Group then go down to other concepts that make the group.

Let's talk about the attribute of Group. First, a group has a name that uniquely identifies it from the others. Secondly, a group has members each one with a specific role: it has a single member that produces messages and can have many consumers members. This is the state of the class representing the group. In addition to this, the main responsibility of a group is to deliver received messages to the consumers. However, as we discussed above, we have 3 policies of sending messages: single delivery, per-match, and publisher/subscriber.

In the previous paragraph, we have gathered a kind of specification for Group Concept on which we can start taking some design decision and go for the coding. One of the questions we need to ask is how are we going to implement the different policies of the delivery process? For this I suggest using polymorphism: have a base class that implements the common things and delegate differences of each policy to the other derived classes. Moreover, I prefer exposing a facade to hide the base class(a wrapper) and control instantiation via a factory method. I used Group as the name of the facade and GroupImpl as the base class. For the derived classes, we have 3: GroupSingleDelivery, GroupPerMatch, GroupPubSub. The following picture shows how each method in Group calls the one in GroupImpl and that Group holds a unique_ptr to GroupImpl.

Methods calls

We also show the methods and attributes of GroupImpl along with the derived classes with this picture:
GroupImpl and its childreen

Note that _distributeMessage(...) is a pure virtual member function.

Now that we exposed the design of the Group concept, we next show the implementation so here is Group code:

//src/lib/core/group.hh
#pragma once

#include <algorithm>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <string_view>

#include <boost/uuid/uuid.hpp>

#include <seastar/core/future-util.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/net/api.hh>
#include <seastar/net/inet_address.hh>

#include "lib/identity/identity.hh"
#include "member.hh"

using namespace seastar;
using namespace net;
//...
class Group {
  std::unique_ptr<GroupImpl> m_impl;

public:
  Group(std::unique_ptr<GroupImpl> grp) : m_impl{std::move(grp)} {
  }

  Group() = default;
  Group(Group &&) = default;
  Group &operator=(Group &&) = default;
  ~Group() = default;

  bool isProducer(Identity<boost::uuids::uuid> id) {
    return m_impl->_isProducer(id);
  }
  bool isConsumer(Identity<boost::uuids::uuid> id) {
    return m_impl->_isConsumer(id);
  }

  bool setProducer(Member prod) {
    return m_impl->_setProducer(prod);
  }

  void unsetProducer() {
    m_impl->_unsetProducer();
  }

  bool addConsumer(Member cons) {
    return m_impl->_addConsumer(cons);
  }

  void removeConsumer(Identity<boost::uuids::uuid> id) {
    m_impl->_removeConsumer(id);
  }

  std::string distributeMessage(std::string msg) {
    return m_impl->_distributeMessage(msg);
  }

  bool producerIsSet() const {
    return m_impl->_producerIsSet();
  }

  Identity<std::string> identity() const {
    return m_impl->_identity();
  }
};
//...
Enter fullscreen mode Exit fullscreen mode

As you can see, all methods mirror their counterparts, prefixed with _, from GroupImpl.

GroupImpl holds the state and implements all member functions except for _distributeMessage(...), so let's take a look at the code:

//src/lib/core/group.hh
#pragma
//includes...

class GroupImpl {
protected:
  Identity<std::string> m_identity;
  Member m_producer;
  std::vector<Member> m_consumers;
  bool m_prodSet = false;

public:
  GroupImpl() = default;
  GroupImpl(const GroupImpl &) = delete;
  GroupImpl(GroupImpl &&) = default;
  ~GroupImpl() = default;

  GroupImpl &operator=(const GroupImpl &) = delete;
  GroupImpl &operator=(GroupImpl &&) = default;

  GroupImpl(const std::string &str)
    : m_identity{str}, m_producer{}, m_consumers{}, m_prodSet{false} {
  }

  GroupImpl(Identity<std::string> id)
    : m_identity{id}, m_producer{}, m_consumers{}, m_prodSet{false} {
  }

  bool _isProducer(Identity<boost::uuids::uuid> id);
  bool _isConsumer(Identity<boost::uuids::uuid> id);

  bool _setProducer(Member prod);
  bool _addConsumer(Member cons);

  void _unsetProducer();
  void _removeConsumer(Identity<boost::uuids::uuid> id);

  bool _producerIsSet() const;

  Identity<std::string> _identity() const;

  void notify();

  virtual std::string _distributeMessage(std::string msg) = 0;
};

class Group {
//...
};

//...

Enter fullscreen mode Exit fullscreen mode

Note that GroupImpl is before Group so make sure you respect the order. This is because we declared and defined Group in the header file so the compiler needs to know the full type of GroupImpl because we are using its methods in the definition of Group.

GroupImpl data members are marked as protected so that we can access them in derived classes. Moreover, we introduced Member struct representing the Member concept in our model. We will discuss its implementation after we deal with the derived classes from GroupImpl. Let's now look into the definition of GroupImpl member functions:

//src/lib/core/group.cpp
#include "group.hh"

bool GroupImpl::_isProducer(Identity<boost::uuids::uuid> id) {

  if (!this->_producerIsSet())
    return false;
  return m_producer.clt->identity() == id;
}
bool GroupImpl::_isConsumer(Identity<boost::uuids::uuid> id) {
  return std::any_of(
      begin(m_consumers), end(m_consumers),
      [&id](const Member &mbr) { return mbr.clt->identity() == id; });
}

bool GroupImpl::_setProducer(Member prod) {
  if (_isConsumer(prod.clt->identity()))
    return false;

  m_producer = prod;
  m_prodSet = true;
  return true;
}
bool GroupImpl::_addConsumer(Member cons) {
  if (_isProducer(cons.clt->identity()) || _isConsumer(cons.clt->identity()))
    return false;

  m_consumers.push_back(cons);
  return true;
}

void GroupImpl::_unsetProducer() {
  m_prodSet = false;
  m_producer.clt = nullptr;
}

void GroupImpl::_removeConsumer(Identity<boost::uuids::uuid> id) {
  this->m_consumers.erase(std::remove_if(begin(m_consumers), end(m_consumers),
                                         [&id](const Member &mbr) {
                                           return mbr.clt->identity() == id;
                                         }),
                          m_consumers.end());
}

bool GroupImpl::_producerIsSet() const {
  return m_prodSet;
}

Identity<std::string> GroupImpl::_identity() const {
  return m_identity;
}
Enter fullscreen mode Exit fullscreen mode

I think the code presented here is clear but if not, feel free to leave a comment, I will happily explain.

With GroupImpl out of the way, let's pass to the derived classes. We start with GroupSingleDelivery but I will remove part of codes that have to do with the protocol:

//src/lib/core/group.hh
//includes
//class GroupImpl
//class group
//...
class GroupSingleDelivery : public GroupImpl {
  int nextDst = 0;

public:
  using GroupImpl::GroupImpl;
  std::string _distributeMessage(std::string msg) {
    auto consSize = m_consumers.size();
    if (consSize == 0)
      return //...;
    if (!this->m_consumers[nextDst % consSize].clt->push(m_identity.toString(),
                                                         msg))
      return //...;
    using namespace std::chrono_literals;
    seastar::sleep(1s).then([this, consSize] {this->m_consumers[nextDst % consSize].clt->startDelivery();});
    ++nextDst;
    return //...;
  }
};
//...
Enter fullscreen mode Exit fullscreen mode

GroupSingleDelivery class send a received message to only one of its consumers and it keeps track of next destination using nextDst data member modulus the size of consumers in the current group.

std::string _distributeMessage(std::string msg) takes the message we want to deliver and push it to one of the consumers queue then trigger the delivery process.

Next is GroupPubSub class. Here is the code :

//src/lib/core/group.hh
//...
//class GroupSingleDe...
//...
class GroupPubSub : public GroupImpl {

public:
  using GroupImpl::GroupImpl;
  std::string _distributeMessage(std::string msg) {

    auto consSize = m_consumers.size();
    if (consSize == 0)
      return //...;
    bool queuedForAtLeastOne = false;
    for (const auto &elm : m_consumers) {
      queuedForAtLeastOne =
           elm.clt->push(m_identity.toString(), msg) || queuedForAtLeastOne ;
      elm.clt->startDelivery();
    }
    if (!queuedForAtLeastOne)
      return //...;
    return //...;
  }
};
Enter fullscreen mode Exit fullscreen mode

In this version, the message gets delivered to all consumers, hence the range-based for loop iterating over the vector of consumers and pushing the message into each Client channel and start the delivery process for each consumer.

Instead of showing the code of GroupPerMatch I will jump to other concepts from the model that we are using and that we didn't define them. _distributeMessage of GroupPerMatch use a bit more of Member so I think it makes sense to go deep(Member, Association,...) and then return to finish the last derived class even though.

Member class is an abstraction that points to an actual Client member of a Group and holds information needed for the pushing and delivery process:

//src/lib/core/member.hh
#pragma once

#include <string>

#include "client.hh"
//...
struct Member {
  Client *clt;
  Association ass;

  Member() : clt{nullptr}, ass{""} {
  }
  ~Member() = default;
};
Enter fullscreen mode Exit fullscreen mode

As you may have guessed, Association is the class that holds the data needed for the business logic of per-match distribution and offer to verify the criteria. This is how it's implemented:

//src/lib/core/member.hh
class Association {
  std::string m_annotation;
  std::string m_queueName;

public:
  Association(std::string qn, std::string ann)
    : m_annotation{std::move(ann)}, m_queueName{std::move(qn)} {
  }
  Association(std::string qn) : m_annotation{}, m_queueName{std::move(qn)} {
  }

  bool hasAnnotation() const {
    return m_annotation != "";
  }

  bool verify(const std::string &ann) const {
    return m_annotation == ann;
  }

  const std::string &dstChannelName() const {
    return m_queueName;
  }
};

//struct Member...
Enter fullscreen mode Exit fullscreen mode

Association holds an annotation used in the per-match delivery policy and a name of the queue where to push the message. With this, we have finished implementing concept related to Group but still, we have to finish GroupPerMatch.

Disclaimer: When designing the protocol and developing the modules I forgot about something very important for _distributeMessage of GroupPerMatch. This leads to an ugly solution or rather say a hack to make it work and not something clean and sensable. So what I am presenting here is literally my mistake and not something you would take but rather avoid. I couldn't make a proper fix because I have almost no time left and I needed it to work.

What I forgot about is to pass the annotation as an argument in the protocol and I already implemented the parsing so I went to actually encode it with the data, hence, rather than getting it for free from the protocol parser I had to inject a parsing function inside the _distributeMessage method. Moreover, I didn't account for the extra annotation argument in _distributeMessage method. In the ideal case the signature would be virtual std::string _distributeMessage(std::string msg, std::string ann=0) = 0; and the argument will be extracted using the protocol parser.

What I ended up doing was encoding the annotation inside the actual data like so: *****info-Hello where Hello is the data to be delivered and info is the annotation delimited by ***** and -.

//...
class GroupPerMatch : public GroupImpl {

public:
  using GroupImpl::GroupImpl;
  std::string _distributeMessage(std::string msg) {

    auto consSize = m_consumers.size();
    if (consSize == 0)
      return //...;

    auto extractCritere =
        [](std::string &data) -> compat::optional<std::string> {
      const std::string opMark = "******";
      auto first = begin(data);
      auto last = begin(data) + 7;
      std::string_view str{data.c_str(), 7};

      if (str == opMark)
        return {};
      std::string ann;
      ann.reserve(256);

      auto _pos = std::find(begin(data) + 7, end(data), '-');
      if (_pos == end(data))
        return {};

      std::copy(begin(data) + 7, _pos, std::back_inserter(ann));

      data.erase(std::rotate(begin(data), _pos + 1, end(data)));

      return {ann};
    };

    auto critere = extractCritere(msg);

    if (!critere.has_value())
      return //...;
    auto crt = critere.value();

    auto start = begin(m_consumers);

    bool matchedOne = false;

    do {

      auto cons =
          std::find_if(start, end(m_consumers), [&crt](const Member &meb) {
            return meb.ass.verify(crt);
          });
      if (cons == end(m_consumers)) {
        if (!matchedOne)
          return //....;
        return //...;
      }
      cons->clt->push(m_identity.toString(), msg);
      cons->clt->startDelivery();
      matchedOne = true;
      start = cons + 1;
    } while (true);
  }
};
//...class GroupPubSub
Enter fullscreen mode Exit fullscreen mode

So we start with our usual check for the number of consumers then we define a lambda that will parse and extract the encoded annotation if any. This lambda returns an optional. The do {} while(); loop is to check for the consumers that verifies the critiria so that we can push and start the delivery process.

As I said in the disclaimer, this is not the correct way of doing it. Just something to make work. In a future article, I will be addressing this.

Another pitfall in my design is that the core module contains pieces of code directly related to the protocol module, hence an understanding of the protocol is required before seeing them. This is why we will be leaving the definition of the Topology class methods for the next article along with the protocol module. Also, I will suggest a way to get read of this direct dependency.

Conclusion

We have covered the messaging model of our Message-oriented middleware and also have shown most of the implementation. I hope you take this model as a source of inspiration when designing your own.

Next Part

In the next article, we will be talking about the protocol MOZA, that enables communication between applications and the middleware.

Top comments (0)