YAMI4 C++ Library 2.0.0
Messaging Solution for Distributed Systems
|
Message broker. More...
#include <agent.h>
Public Types | |
typedef long long | outgoing_message_id |
Outgoing message identifier type. | |
Public Member Functions | |
agent (const parameters &options=parameters()) | |
Constructor. More... | |
agent (event_callback &event_listener, const parameters &options=parameters()) | |
Constructor. More... | |
~agent () | |
Destructor. More... | |
std::string | add_listener (const std::string &listener) |
Adds new listener. More... | |
void | remove_listener (const std::string &listener) |
Removes existing listener. More... | |
template<typename functor > | |
void | register_object (const std::string &object_name, functor &f) |
Registers the new logical destination object. More... | |
void | register_raw_object (const std::string &object_name, void(*callback)(incoming_message &im, void *hint), void *hint) |
void | register_value_publisher (const std::string &object_name, value_publisher &publisher) |
Registers the value publisher as a new logical object. More... | |
void | unregister_object (const std::string &object_name) |
Unregisters the logical destination object. More... | |
void | open_connection (const std::string &target) |
Opens the new connection. More... | |
void | open_connection (const std::string &target, const parameters &options) |
Opens the new connection with overriding options. More... | |
outgoing_message_id | get_next_message_id () |
Obtains unique message id. More... | |
std::unique_ptr< outgoing_message > | send (const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true) |
Sends the new outgoing message. More... | |
void | send (outgoing_message &message, const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true) |
Sends the outgoing message. More... | |
template<typename functor > | |
outgoing_message_id | send (functor &f, const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true) |
Sends the outgoing message. More... | |
std::unique_ptr< outgoing_message > | send (const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true) |
Sends the new outgoing message with a custom header. More... | |
void | send (outgoing_message &message, const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true) |
Sends the outgoing message with a custom header. More... | |
template<typename functor > | |
void | send (functor &f, const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true) |
Sends the outgoing message with a custom header. More... | |
void | clean_outgoing_message_callback (outgoing_message_id id) |
Cleans internal resources for the given message callback. More... | |
void | send_one_way (const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true) |
Sends the new outgoing message. More... | |
void | send_one_way (const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true) |
Sends the new outgoing message with custom header. More... | |
void | close_connection (const std::string &target, std::size_t priority=0) |
Closes the given communication channel. More... | |
void | hard_close_connection (const std::string &target) |
Immediately closes the given communication channel. More... | |
template<typename functor > | |
void | register_connection_event_monitor (functor &f) |
Registers the monitor for connection-related events. More... | |
template<typename functor > | |
void | register_io_error_logger (functor &f) |
Registers the logger for I/O errors. More... | |
void | get_outgoing_flow_state (std::size_t ¤t_level, std::size_t &high_water_mark, std::size_t &low_water_mark) const |
Obtains the state of overall outgoing flow. More... | |
void | get_channel_usage (std::size_t &max_allowed, std::size_t &used) |
Returns the selector's channel usage counters. More... | |
std::size_t | get_pending_outgoing_bytes (const std::string &target) |
Returns the size of outgoing queue for the given channel. More... | |
void | run_worker () |
Runs the internal worker activities. More... | |
void | run_dispatcher (std::size_t dispatcher_index) |
Runs the message dispatching activities. More... | |
Message broker.
The message broker that encapsulates physical channel management, incoming and outgoing message queues, listeners and resource management.
A single agent object can manage many listeners, which are responsible for accepting remote connections, and many incoming and outgoing connections.
The agent objects can be created and destroyed without constraints on the stack, on the free store or as static objects.
The objects of this class can be safely used by multiple threads.
yami::agent::agent | ( | const parameters & | options = parameters() | ) |
Constructor.
Creates the message broker and starts its internal threads. The broker created with this constructor has no listener.
yami::agent::agent | ( | event_callback & | event_listener, |
const parameters & | options = parameters() |
||
) |
Constructor.
Creates the message broker with event notification callback. The broker created with this constructor has no listener.
yami::agent::~agent | ( | ) |
Destructor.
The destructor stops the internal threads and cleans up all internal data structures.
Note: The messages and replies that were posted for transmission and that have not yet been fully transmitted are abandoned; in the case of outgoing messages their state is properly notified about that fact.
std::string yami::agent::add_listener | ( | const std::string & | listener | ) |
Adds new listener.
Adds a new listener for the given target address.
The supported target formats are:
host
can be provided in the symbolic or numeric formThe port for TCP/IP and UDP protocols can be 0
or *
, in which case the actual port number is assigned by the system.
listener | The target name for the new listener. |
void yami::agent::clean_outgoing_message_callback | ( | outgoing_message_id | id | ) |
Cleans internal resources for the given message callback.
Cleans internal resources associated with the given message callback. The application should take care not to clear callback that are potentially executed.
id | Outgoing message identifier, as obtained from send or from get_next_message_id. |
void yami::agent::close_connection | ( | const std::string & | target, |
std::size_t | priority = 0 |
||
) |
Closes the given communication channel.
Closes the channel identified by name.
The priority allows to properly handle the existing outgoing messages that are waiting in the outgoing queue for transmission. The existing messages with lower priority are abandoned, whereas the existing messages with priority equal or higher to the one provided as parameter are retained in the outgoing queue and are properly pushed for transmission before the channel is physically closed. The channel is closed immediately only if there are no messages waiting in its outgoing queue.
target | The name of the target endpoint. |
priority | Proprity of the request, respects existing messages in the outgoing queue. |
void yami::agent::get_channel_usage | ( | std::size_t & | max_allowed, |
std::size_t & | used | ||
) |
Returns the selector's channel usage counters.
Note: The information obtained with this function can be in constant flow, as incoming and outgoing channels are created and closed.
max_allowed | The maximum number of channels that the agent can handle internally or 0 if the limit is unknown. |
used | Total number of handled channels and listeners. |
outgoing_message_id yami::agent::get_next_message_id | ( | ) |
Obtains unique message id.
Obtains unique message id, appropriate for use in customized message headers.
void yami::agent::get_outgoing_flow_state | ( | std::size_t & | current_level, |
std::size_t & | high_water_mark, | ||
std::size_t & | low_water_mark | ||
) | const |
Obtains the state of overall outgoing flow.
Obtains the state of overall outgoing flow.
Note: The outgoing flow is a combination of all outgoing traffic, and is not tied to any particular communication channel.
current_level | The current level of the outgoing flow. |
high_water_mark | The high water mark. |
low_water_mark | The low water mark. |
std::size_t yami::agent::get_pending_outgoing_bytes | ( | const std::string & | target | ) |
Returns the size of outgoing queue for the given channel.
target | The name of the target endpoint. |
void yami::agent::hard_close_connection | ( | const std::string & | target | ) |
Immediately closes the given communication channel.
Closes the channel identified by name.
The channel is closed immediately and those messages that are waiting in its outgoing queue are abandoned. Integrity of the message that was already partly transmitted is not guaranteed.
target | The name of the target endpoint. |
void yami::agent::open_connection | ( | const std::string & | target | ) |
Opens the new connection.
Opens the new channel or does nothing if the channel already exists.
This function is not necessary with automatic connection recovery option in send
and send_one_way
.
target | The name of the target endpoint. This name should correspond to the listener name in some target agent object. |
void yami::agent::open_connection | ( | const std::string & | target, |
const parameters & | options | ||
) |
Opens the new connection with overriding options.
Opens the new channel or does nothing if the channel already exists. If the new channel is created, it will use the overriding options from those which are defined.
This function is not necessary with automatic connection recovery option in send
and send_one_way
.
target | The name of the target endpoint. This name should correspond to the listener name in some target agent object. |
options | The set of options that will override agent's values. |
|
inline |
Registers the monitor for connection-related events.
Registers the monitor for connection events.
Note: The monitor callback is intentionally not synchronized. Use this function after constructing the agent, but before opening any connections.
f | The callable entity that can accept the std::string as the connection name and connection_event as event description. |
|
inline |
Registers the logger for I/O errors.
Registers the logger for I/O errors.
Note: The logger callback is intentionally not synchronized. Use this function after constructing the agent, but before opening any connections.
f | The callable entity that can accept the int as the error code and const char * as error description. |
|
inline |
Registers the new logical destination object.
Registers the new "object" that can be a logical destination for incoming messages.
object_name | The name of the newly registered object. If an object with this name is already registered, the registration data is replaced. |
f | The callable entity that can accept the incoming_message as the invocation parameter. |
void yami::agent::register_value_publisher | ( | const std::string & | object_name, |
value_publisher & | publisher | ||
) |
Registers the value publisher as a new logical object.
object_name | The name of the newly registered object. If an object with this name is already registered, the registration data is replaced. |
publisher | The value publisher to be registered. |
void yami::agent::remove_listener | ( | const std::string & | listener | ) |
Removes existing listener.
Removes the listener denoted by its actual target name. Note that the actual target name might be different from the name provided when the listener was created, due to target resolution. The name which should be used for listener removal is the name that is returned by the add_listener
function.
void yami::agent::run_dispatcher | ( | std::size_t | dispatcher_index | ) |
Runs the message dispatching activities.
Runs the dispatcher activities on behalf of the agent. This function allows the application to provide its own thread of execution for the agent's needs, as with RTOS no threads are explicitly created by the agent object. This function returns when the agent is destroyed or when internal error occurs.
dispatcher_index | The index, starting from 0, of the dispatcher. There should be as many dispatcher threads calling this function, with indices from 0 to N-1, as defined in options, with N==1 by default. |
void yami::agent::run_worker | ( | ) |
Runs the internal worker activities.
Runs the internal worker activities on behalf of the agent. This function allows the application to provide its own thread of execution for the agent's needs, as with RTOS no threads are explicitly created by the agent object. This function returns when the agent is destroyed or when internal error occurs.
std::unique_ptr< outgoing_message > yami::agent::send | ( | const std::string & | target, |
const serializable & | header, | ||
const serializable & | content, | ||
outgoing_message_id | message_id, | ||
std::size_t | priority = 0 , |
||
bool | auto_connect = true |
||
) |
Sends the new outgoing message with a custom header.
Sends the new outgoing message to the given destination, with fully customized message header.
target | The name of the target endpoint. This name should correspond to the listener name in some target agent object. |
header | The header of the message. |
content | The content of the message. |
message_id | Message identifier. |
priority | The priority of the message. |
auto_connect | The flag controlling automatic (re)connection. |
outgoing_message
object that allows to track the progress of this message, its status and obtain response data.Note: See get_next_message_id
. Note: This function implicitly opens a new communication channel if it is not already open. This channel is kept open until it is explicitly closed (see the close_connection
function) or until the agent is destroyed or the communication error is detected.
std::unique_ptr< outgoing_message > yami::agent::send | ( | const std::string & | target, |
const std::string & | object_name, | ||
const std::string & | message_name, | ||
const serializable & | content = parameters() , |
||
std::size_t | priority = 0 , |
||
bool | auto_connect = true |
||
) |
Sends the new outgoing message.
Sends the new outgoing message to the given destination.
target | The name of the target endpoint. This name should correspond to the listener name in some target agent object. |
object_name | The name of the logical destination object in the target agent. |
message_name | The name of the message. |
content | The content of the message. |
priority | The priority of the message. |
auto_connect | The flag controlling automatic (re)connection. |
outgoing_message
object that allows to track the progress of this message, its status and obtain response data.Note: This function implicitly opens a new communication channel if it is not already open. This channel is kept open until it is explicitly closed (see the close_connection
function) or until the agent is destroyed or the communication error is detected.
|
inline |
Sends the outgoing message with a custom header.
Sends the outgoing message to the given destination and executes the callback whenever the status of the message changes. The callback should accept the outgoing_message object as its parameter. Note: the callback should not block the processing unnecessarily.
This function behaves as the other version of send, except for the outgoing_message object creation policy.
|
inline |
Sends the outgoing message.
Sends the outgoing message to the given destination and executes the callback whenever the status of the message changes. The callback should accept the outgoing_message object as its parameter. Note: the callback should not block the processing unnecessarily.
This function behaves as the other version of send, except for the outgoing_message object creation policy.
void yami::agent::send | ( | outgoing_message & | message, |
const std::string & | target, | ||
const serializable & | header, | ||
const serializable & | content, | ||
outgoing_message_id | message_id, | ||
std::size_t | priority = 0 , |
||
bool | auto_connect = true |
||
) |
Sends the outgoing message with a custom header.
Sends the outgoing message to the given destination and reinitializes the message object in-place. Allows to use fully customized headers.
This function behaves as the other version of send, except for the outgoing_message object creation policy.
void yami::agent::send | ( | outgoing_message & | message, |
const std::string & | target, | ||
const std::string & | object_name, | ||
const std::string & | message_name, | ||
const serializable & | content = parameters() , |
||
std::size_t | priority = 0 , |
||
bool | auto_connect = true |
||
) |
Sends the outgoing message.
Sends the outgoing message to the given destination and reinitializes the message object in-place.
This function behaves as the other version of send, except for the outgoing_message object creation policy.
void yami::agent::send_one_way | ( | const std::string & | target, |
const serializable & | header, | ||
const serializable & | content, | ||
outgoing_message_id | message_id, | ||
std::size_t | priority = 0 , |
||
bool | auto_connect = true |
||
) |
Sends the new outgoing message.
Sends the new outgoing message to the given destination, without the possibility to track its progress.
See the description and notes for the send
function.
void yami::agent::send_one_way | ( | const std::string & | target, |
const std::string & | object_name, | ||
const std::string & | message_name, | ||
const serializable & | content = parameters() , |
||
std::size_t | priority = 0 , |
||
bool | auto_connect = true |
||
) |
Sends the new outgoing message with custom header.
Sends the new outgoing message to the given destination, without the possibility to track its progress and with a fully customized message header.
See the description and notes for the send
function.
void yami::agent::unregister_object | ( | const std::string & | object_name | ) |
Unregisters the logical destination object.
It is permitted to request unregistration for an object that does not exist - such operation has no effect.
Note: Due to performance and design tradeoffs it is not guaranteed that no more messages will be ever dispatched to the given object when this function returns. In fact, some of the messages that have been received by agent and not yet dispatched might be still dispatched shortly after this function returns. Only those messages that are received by agent after this function returns are guaranteed not to be dispatched to the unregistered object. This might be particularly important with regard to the lifetime of the callable entity that was provided when the given object has been registered.
object_name | The name of the object to be unregistered. |