YAMI4 C++ Library 2.0.0
Messaging Solution for Distributed Systems
Loading...
Searching...
No Matches
Public Types | Public Member Functions | List of all members
yami::agent Class Reference

Message broker. More...

#include <agent.h>

Public Types

typedef long long outgoing_message_id
 Outgoing message identifier type.
 

Public Member Functions

 agent (const parameters &options=parameters())
 Constructor. More...
 
 agent (event_callback &event_listener, const parameters &options=parameters())
 Constructor. More...
 
 ~agent ()
 Destructor. More...
 
std::string add_listener (const std::string &listener)
 Adds new listener. More...
 
void remove_listener (const std::string &listener)
 Removes existing listener. More...
 
template<typename functor >
void register_object (const std::string &object_name, functor &f)
 Registers the new logical destination object. More...
 
void register_raw_object (const std::string &object_name, void(*callback)(incoming_message &im, void *hint), void *hint)
 
void register_value_publisher (const std::string &object_name, value_publisher &publisher)
 Registers the value publisher as a new logical object. More...
 
void unregister_object (const std::string &object_name)
 Unregisters the logical destination object. More...
 
void open_connection (const std::string &target)
 Opens the new connection. More...
 
void open_connection (const std::string &target, const parameters &options)
 Opens the new connection with overriding options. More...
 
outgoing_message_id get_next_message_id ()
 Obtains unique message id. More...
 
std::unique_ptr< outgoing_messagesend (const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true)
 Sends the new outgoing message. More...
 
void send (outgoing_message &message, const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true)
 Sends the outgoing message. More...
 
template<typename functor >
outgoing_message_id send (functor &f, const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true)
 Sends the outgoing message. More...
 
std::unique_ptr< outgoing_messagesend (const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true)
 Sends the new outgoing message with a custom header. More...
 
void send (outgoing_message &message, const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true)
 Sends the outgoing message with a custom header. More...
 
template<typename functor >
void send (functor &f, const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true)
 Sends the outgoing message with a custom header. More...
 
void clean_outgoing_message_callback (outgoing_message_id id)
 Cleans internal resources for the given message callback. More...
 
void send_one_way (const std::string &target, const serializable &header, const serializable &content, outgoing_message_id message_id, std::size_t priority=0, bool auto_connect=true)
 Sends the new outgoing message. More...
 
void send_one_way (const std::string &target, const std::string &object_name, const std::string &message_name, const serializable &content=parameters(), std::size_t priority=0, bool auto_connect=true)
 Sends the new outgoing message with custom header. More...
 
void close_connection (const std::string &target, std::size_t priority=0)
 Closes the given communication channel. More...
 
void hard_close_connection (const std::string &target)
 Immediately closes the given communication channel. More...
 
template<typename functor >
void register_connection_event_monitor (functor &f)
 Registers the monitor for connection-related events. More...
 
template<typename functor >
void register_io_error_logger (functor &f)
 Registers the logger for I/O errors. More...
 
void get_outgoing_flow_state (std::size_t &current_level, std::size_t &high_water_mark, std::size_t &low_water_mark) const
 Obtains the state of overall outgoing flow. More...
 
void get_channel_usage (std::size_t &max_allowed, std::size_t &used)
 Returns the selector's channel usage counters. More...
 
std::size_t get_pending_outgoing_bytes (const std::string &target)
 Returns the size of outgoing queue for the given channel. More...
 
void run_worker ()
 Runs the internal worker activities. More...
 
void run_dispatcher (std::size_t dispatcher_index)
 Runs the message dispatching activities. More...
 

Detailed Description

Message broker.

The message broker that encapsulates physical channel management, incoming and outgoing message queues, listeners and resource management.

A single agent object can manage many listeners, which are responsible for accepting remote connections, and many incoming and outgoing connections.

The agent objects can be created and destroyed without constraints on the stack, on the free store or as static objects.

The objects of this class can be safely used by multiple threads.

Constructor & Destructor Documentation

◆ agent() [1/2]

yami::agent::agent ( const parameters options = parameters())

Constructor.

Creates the message broker and starts its internal threads. The broker created with this constructor has no listener.

◆ agent() [2/2]

yami::agent::agent ( event_callback event_listener,
const parameters options = parameters() 
)

Constructor.

Creates the message broker with event notification callback. The broker created with this constructor has no listener.

◆ ~agent()

yami::agent::~agent ( )

Destructor.

The destructor stops the internal threads and cleans up all internal data structures.

Note: The messages and replies that were posted for transmission and that have not yet been fully transmitted are abandoned; in the case of outgoing messages their state is properly notified about that fact.

Member Function Documentation

◆ add_listener()

std::string yami::agent::add_listener ( const std::string &  listener)

Adds new listener.

Adds a new listener for the given target address.

The supported target formats are:

  • "tcp://host:port" for TCP/IP connections, where host can be provided in the symbolic or numeric form
  • "tcp://*:port" for TCP/IP connections, for "any" local address
  • "tcp://port" for TCP/IP connections, for "any" local address
  • "udp://host:port" for UDP communication, with rules as for TCP/IP
  • "unix://path" for Unix connections

The port for TCP/IP and UDP protocols can be 0 or *, in which case the actual port number is assigned by the system.

Parameters
listenerThe target name for the new listener.
Returns
The locally resolved listener name. This name can be used to remove the listener later on.

◆ clean_outgoing_message_callback()

void yami::agent::clean_outgoing_message_callback ( outgoing_message_id  id)

Cleans internal resources for the given message callback.

Cleans internal resources associated with the given message callback. The application should take care not to clear callback that are potentially executed.

Parameters
idOutgoing message identifier, as obtained from send or from get_next_message_id.

◆ close_connection()

void yami::agent::close_connection ( const std::string &  target,
std::size_t  priority = 0 
)

Closes the given communication channel.

Closes the channel identified by name.

The priority allows to properly handle the existing outgoing messages that are waiting in the outgoing queue for transmission. The existing messages with lower priority are abandoned, whereas the existing messages with priority equal or higher to the one provided as parameter are retained in the outgoing queue and are properly pushed for transmission before the channel is physically closed. The channel is closed immediately only if there are no messages waiting in its outgoing queue.

Parameters
targetThe name of the target endpoint.
priorityProprity of the request, respects existing messages in the outgoing queue.

◆ get_channel_usage()

void yami::agent::get_channel_usage ( std::size_t &  max_allowed,
std::size_t &  used 
)

Returns the selector's channel usage counters.

Note: The information obtained with this function can be in constant flow, as incoming and outgoing channels are created and closed.

Parameters
max_allowedThe maximum number of channels that the agent can handle internally or 0 if the limit is unknown.
usedTotal number of handled channels and listeners.

◆ get_next_message_id()

outgoing_message_id yami::agent::get_next_message_id ( )

Obtains unique message id.

Obtains unique message id, appropriate for use in customized message headers.

Returns
The new unique message identifier.

◆ get_outgoing_flow_state()

void yami::agent::get_outgoing_flow_state ( std::size_t &  current_level,
std::size_t &  high_water_mark,
std::size_t &  low_water_mark 
) const

Obtains the state of overall outgoing flow.

Obtains the state of overall outgoing flow.

Note: The outgoing flow is a combination of all outgoing traffic, and is not tied to any particular communication channel.

Parameters
current_levelThe current level of the outgoing flow.
high_water_markThe high water mark.
low_water_markThe low water mark.

◆ get_pending_outgoing_bytes()

std::size_t yami::agent::get_pending_outgoing_bytes ( const std::string &  target)

Returns the size of outgoing queue for the given channel.

Parameters
targetThe name of the target endpoint.
Returns
The size of outgoing queue in bytes.

◆ hard_close_connection()

void yami::agent::hard_close_connection ( const std::string &  target)

Immediately closes the given communication channel.

Closes the channel identified by name.

The channel is closed immediately and those messages that are waiting in its outgoing queue are abandoned. Integrity of the message that was already partly transmitted is not guaranteed.

Parameters
targetThe name of the target endpoint.

◆ open_connection() [1/2]

void yami::agent::open_connection ( const std::string &  target)

Opens the new connection.

Opens the new channel or does nothing if the channel already exists.

This function is not necessary with automatic connection recovery option in send and send_one_way.

Parameters
targetThe name of the target endpoint. This name should correspond to the listener name in some target agent object.

◆ open_connection() [2/2]

void yami::agent::open_connection ( const std::string &  target,
const parameters options 
)

Opens the new connection with overriding options.

Opens the new channel or does nothing if the channel already exists. If the new channel is created, it will use the overriding options from those which are defined.

This function is not necessary with automatic connection recovery option in send and send_one_way.

Parameters
targetThe name of the target endpoint. This name should correspond to the listener name in some target agent object.
optionsThe set of options that will override agent's values.

◆ register_connection_event_monitor()

template<typename functor >
void yami::agent::register_connection_event_monitor ( functor &  f)
inline

Registers the monitor for connection-related events.

Registers the monitor for connection events.

Note: The monitor callback is intentionally not synchronized. Use this function after constructing the agent, but before opening any connections.

Parameters
fThe callable entity that can accept the std::string as the connection name and connection_event as event description.

◆ register_io_error_logger()

template<typename functor >
void yami::agent::register_io_error_logger ( functor &  f)
inline

Registers the logger for I/O errors.

Registers the logger for I/O errors.

Note: The logger callback is intentionally not synchronized. Use this function after constructing the agent, but before opening any connections.

Parameters
fThe callable entity that can accept the int as the error code and const char * as error description.

◆ register_object()

template<typename functor >
void yami::agent::register_object ( const std::string &  object_name,
functor &  f 
)
inline

Registers the new logical destination object.

Registers the new "object" that can be a logical destination for incoming messages.

Parameters
object_nameThe name of the newly registered object. If an object with this name is already registered, the registration data is replaced.
fThe callable entity that can accept the incoming_message as the invocation parameter.

◆ register_value_publisher()

void yami::agent::register_value_publisher ( const std::string &  object_name,
value_publisher publisher 
)

Registers the value publisher as a new logical object.

Parameters
object_nameThe name of the newly registered object. If an object with this name is already registered, the registration data is replaced.
publisherThe value publisher to be registered.

◆ remove_listener()

void yami::agent::remove_listener ( const std::string &  listener)

Removes existing listener.

Removes the listener denoted by its actual target name. Note that the actual target name might be different from the name provided when the listener was created, due to target resolution. The name which should be used for listener removal is the name that is returned by the add_listener function.

◆ run_dispatcher()

void yami::agent::run_dispatcher ( std::size_t  dispatcher_index)

Runs the message dispatching activities.

Runs the dispatcher activities on behalf of the agent. This function allows the application to provide its own thread of execution for the agent's needs, as with RTOS no threads are explicitly created by the agent object. This function returns when the agent is destroyed or when internal error occurs.

Parameters
dispatcher_indexThe index, starting from 0, of the dispatcher. There should be as many dispatcher threads calling this function, with indices from 0 to N-1, as defined in options, with N==1 by default.

◆ run_worker()

void yami::agent::run_worker ( )

Runs the internal worker activities.

Runs the internal worker activities on behalf of the agent. This function allows the application to provide its own thread of execution for the agent's needs, as with RTOS no threads are explicitly created by the agent object. This function returns when the agent is destroyed or when internal error occurs.

◆ send() [1/6]

std::unique_ptr< outgoing_message > yami::agent::send ( const std::string &  target,
const serializable header,
const serializable content,
outgoing_message_id  message_id,
std::size_t  priority = 0,
bool  auto_connect = true 
)

Sends the new outgoing message with a custom header.

Sends the new outgoing message to the given destination, with fully customized message header.

Parameters
targetThe name of the target endpoint. This name should correspond to the listener name in some target agent object.
headerThe header of the message.
contentThe content of the message.
message_idMessage identifier.
priorityThe priority of the message.
auto_connectThe flag controlling automatic (re)connection.
Returns
The outgoing_message object that allows to track the progress of this message, its status and obtain response data.

Note: See get_next_message_id. Note: This function implicitly opens a new communication channel if it is not already open. This channel is kept open until it is explicitly closed (see the close_connection function) or until the agent is destroyed or the communication error is detected.

◆ send() [2/6]

std::unique_ptr< outgoing_message > yami::agent::send ( const std::string &  target,
const std::string &  object_name,
const std::string &  message_name,
const serializable content = parameters(),
std::size_t  priority = 0,
bool  auto_connect = true 
)

Sends the new outgoing message.

Sends the new outgoing message to the given destination.

Parameters
targetThe name of the target endpoint. This name should correspond to the listener name in some target agent object.
object_nameThe name of the logical destination object in the target agent.
message_nameThe name of the message.
contentThe content of the message.
priorityThe priority of the message.
auto_connectThe flag controlling automatic (re)connection.
Returns
The outgoing_message object that allows to track the progress of this message, its status and obtain response data.

Note: This function implicitly opens a new communication channel if it is not already open. This channel is kept open until it is explicitly closed (see the close_connection function) or until the agent is destroyed or the communication error is detected.

◆ send() [3/6]

template<typename functor >
void yami::agent::send ( functor &  f,
const std::string &  target,
const serializable header,
const serializable content,
outgoing_message_id  message_id,
std::size_t  priority = 0,
bool  auto_connect = true 
)
inline

Sends the outgoing message with a custom header.

Sends the outgoing message to the given destination and executes the callback whenever the status of the message changes. The callback should accept the outgoing_message object as its parameter. Note: the callback should not block the processing unnecessarily.

This function behaves as the other version of send, except for the outgoing_message object creation policy.

◆ send() [4/6]

template<typename functor >
outgoing_message_id yami::agent::send ( functor &  f,
const std::string &  target,
const std::string &  object_name,
const std::string &  message_name,
const serializable content = parameters(),
std::size_t  priority = 0,
bool  auto_connect = true 
)
inline

Sends the outgoing message.

Sends the outgoing message to the given destination and executes the callback whenever the status of the message changes. The callback should accept the outgoing_message object as its parameter. Note: the callback should not block the processing unnecessarily.

This function behaves as the other version of send, except for the outgoing_message object creation policy.

◆ send() [5/6]

void yami::agent::send ( outgoing_message message,
const std::string &  target,
const serializable header,
const serializable content,
outgoing_message_id  message_id,
std::size_t  priority = 0,
bool  auto_connect = true 
)

Sends the outgoing message with a custom header.

Sends the outgoing message to the given destination and reinitializes the message object in-place. Allows to use fully customized headers.

This function behaves as the other version of send, except for the outgoing_message object creation policy.

◆ send() [6/6]

void yami::agent::send ( outgoing_message message,
const std::string &  target,
const std::string &  object_name,
const std::string &  message_name,
const serializable content = parameters(),
std::size_t  priority = 0,
bool  auto_connect = true 
)

Sends the outgoing message.

Sends the outgoing message to the given destination and reinitializes the message object in-place.

This function behaves as the other version of send, except for the outgoing_message object creation policy.

◆ send_one_way() [1/2]

void yami::agent::send_one_way ( const std::string &  target,
const serializable header,
const serializable content,
outgoing_message_id  message_id,
std::size_t  priority = 0,
bool  auto_connect = true 
)

Sends the new outgoing message.

Sends the new outgoing message to the given destination, without the possibility to track its progress.

See the description and notes for the send function.

◆ send_one_way() [2/2]

void yami::agent::send_one_way ( const std::string &  target,
const std::string &  object_name,
const std::string &  message_name,
const serializable content = parameters(),
std::size_t  priority = 0,
bool  auto_connect = true 
)

Sends the new outgoing message with custom header.

Sends the new outgoing message to the given destination, without the possibility to track its progress and with a fully customized message header.

See the description and notes for the send function.

◆ unregister_object()

void yami::agent::unregister_object ( const std::string &  object_name)

Unregisters the logical destination object.

It is permitted to request unregistration for an object that does not exist - such operation has no effect.

Note: Due to performance and design tradeoffs it is not guaranteed that no more messages will be ever dispatched to the given object when this function returns. In fact, some of the messages that have been received by agent and not yet dispatched might be still dispatched shortly after this function returns. Only those messages that are received by agent after this function returns are guaranteed not to be dispatched to the unregistered object. This might be particularly important with regard to the lifetime of the callable entity that was provided when the given object has been registered.

Parameters
object_nameThe name of the object to be unregistered.

The documentation for this class was generated from the following file: