YAMI4 Core Library 2.0.0
Messaging Solution for Distributed Systems
Loading...
Searching...
No Matches
Public Member Functions | List of all members
yami::core::agent Class Reference

Message broker. More...

#include <agent.h>

Public Member Functions

 agent ()
 Constructor. More...
 
result init (incoming_message_dispatch_function dispatch_callback, void *dispatch_hint, closed_connection_function disconnection_hook=NULL, void *disconnection_hook_hint=NULL, allocator *alloc=NULL)
 Initialization. More...
 
result init (const parameters &configuration_options, incoming_message_dispatch_function dispatch_callback, void *dispatch_hint, closed_connection_function disconnection_hook=NULL, void *disconnection_hook_hint=NULL, allocator *alloc=NULL)
 Initialization. More...
 
void install_event_notifications (event_notification_function event_notification_callback, void *event_notification_hint)
 Installation of logging notifications callback. More...
 
void install_frame_acceptor (frame_acceptor_function frame_acceptor, void *frame_acceptor_hint)
 Installation of frame acceptor callback. More...
 
void install_io_error_logger (io_error_function io_error_callback, void *io_error_callback_hint)
 Installation of I/O error logging callback. More...
 
void clean ()
 Cleanup. More...
 
 ~agent ()
 Destructor. More...
 
result open (const char *target)
 Creates new channel for the given target. More...
 
result open (const char *target, channel_descriptor &cd, bool &created_new_channel)
 Create new channel for the given target. More...
 
result open (const char *target, channel_descriptor &cd, bool &created_new_channel, const parameters *overriding_options)
 Create new channel for the given target with a set of overriding options. More...
 
result is_open (const char *target, channel_descriptor &existing_channel) const
 Checks if the given channel is already open. More...
 
result close (channel_descriptor cd, std::size_t priority=0)
 Closes the given channel. More...
 
result close (const char *target, std::size_t priority=0)
 Closes the given channel. More...
 
result hard_close (channel_descriptor cd)
 Immediately closes the given channel. More...
 
result hard_close (const char *target)
 Immediately closes the given channel. More...
 
result post (channel_descriptor cd, const serializable &message_header, const serializable &message_body, std::size_t priority=0, message_progress_function progress_callback=NULL, void *progress_hint=NULL)
 Posts new message for sending. More...
 
result post (const char *target, const serializable &message_header, const serializable &message_body, std::size_t priority=0, message_progress_function progress_callback=NULL, void *progress_hint=NULL)
 Posts new message for sending. More...
 
result add_listener (const char *target, new_incoming_connection_function connection_hook=NULL, void *connection_hook_hint=NULL, const char **resolved_target=NULL)
 Adds new listener. More...
 
result remove_listener (const char *target)
 Removes existing listener. More...
 
result do_some_work (std::size_t timeout, bool allow_outgoing_traffic=true, bool allow_incoming_traffic=true)
 Performs a portion of I/O or internal management work. More...
 
result interrupt_work_waiter ()
 Artificially interrupts the wait state of do_some_work.
 
void get_channel_usage (std::size_t &max_allowed, std::size_t &used)
 Returns the selector's channel usage counters.
 
result get_pending_outgoing_bytes (channel_descriptor cd, std::size_t &bytes)
 Returns the size of outgoing queue for the given channel.
 
result get_pending_outgoing_bytes (const char *target, std::size_t &bytes)
 

Detailed Description

Message broker.

The message broker that encapsulates physical channel management, incoming and outgoing message queues, listeners and resource management.

A single agent object can manage many listeners, which are responsible for accepting remote connections, and many incoming and outgoing connections.

The agent objects can be created and destroyed without constraints on the stack, on the free store or as static objects.

Constructor & Destructor Documentation

◆ agent()

yami::core::agent::agent ( )

Constructor.

Creates an uninitialized agent object.

The only valid operations that can be executed after constructing a new agent object are:

  • init - to initialize it to the operational state
  • clean - which is an empty operation in this case
  • destruction

◆ ~agent()

yami::core::agent::~agent ( )

Destructor.

Calls clean.

Member Function Documentation

◆ add_listener()

result yami::core::agent::add_listener ( const char *  target,
new_incoming_connection_function  connection_hook = NULL,
void *  connection_hook_hint = NULL,
const char **  resolved_target = NULL 
)

Adds new listener.

Adds a new listener for the given target address.

The supported target formats are:

  • "tcp://host:port" for TCP/IP connections, where host can be provided in the symbolic or numeric form
  • "tcp://*:port" for TCP/IP connections, for "any" local address
  • "tcp://port" for TCP/IP connections, for "any" local address
  • "udp://host:port" for UDP communication, with rules as for TCP/IP
  • "unix://path" for Unix connections
  • "qnx://*" for QNX native messaging

The port for TCP/IP and UDP protocols can be 0 or *, in which case the actual port number is assigned by the system.

Parameters
targetThe target for the new listener.
connection_hookPointer to the user-defined function that will be called when there is a new incoming connection accepted by this listener. This function can intercept and reject the incoming connection. This callback can be NULL.
connection_hook_hintAny parameter that will be passed to the connection_hook function.
resolved_targetThe pointer to actual (resolved) target after the given target is bound. If NULL, the resolved target is not propagated back to caller.
Returns
  • ok if the operation was successful or the appropriate problem description

Note: The agent does not check whether the given target is already used and if there is a conflict it can be reported as an I/O error, depending on the listener protocol.

◆ clean()

void yami::core::agent::clean ( )

Cleanup.

Cleans up the dependent structures and closes all physical connections and listeners. After calling this function the agent object is back in the uninitialized state and can be reused by initializing it again.

The memory structures are cleaned up (deallocated) only if each of them was separately allocated on the global store. If the internal structures were created with externally provided working area (that is, when working_area != NULL when the agent was initialized) then no memory cleanup is performed.

◆ close() [1/2]

result yami::core::agent::close ( channel_descriptor  cd,
std::size_t  priority = 0 
)

Closes the given channel.

Closes the channel identified by the given descriptor.

The priority allows to properly handle the existing outgoing messages that are waiting in the outgoing queue for transmission. The existing messages with lower priority are abandoned, whereas the existing messages with priority equal or higher to the one provided as parameter are retained in the outgoing queue and are properly pushed for transmission before the channel is physically closed. The channel is closed immediately only if there are no messages waiting in its outgoing queue.

Parameters
cdDescriptor of the channel that should be closed.
priorityPriority of the request, respects existing messages in the outgoing queue.
Returns
  • ok if the operation was successful or the appropriate problem description

◆ close() [2/2]

result yami::core::agent::close ( const char *  target,
std::size_t  priority = 0 
)

Closes the given channel.

Closes the channel identified by the target name.

See the other close function for the description of arguments, return values and the discussion of priorities.

◆ do_some_work()

result yami::core::agent::do_some_work ( std::size_t  timeout,
bool  allow_outgoing_traffic = true,
bool  allow_incoming_traffic = true 
)

Performs a portion of I/O or internal management work.

Performs a portion of work with the given timeout. If there is some pending work at the call time it is performed immediately and function returns without waiting for further work; otherwise the call blocks waiting for the work with the given timeout.

The pending work can include any of:

  • any of the listeners is ready to accept new connection
  • any of the channels is ready for reading data
  • any of the channels is ready for output operation and there are pending frames in its outgoing queue
  • there was some change in the internal data structures that needs to be acted upon
Parameters
timeoutTimeout in milliseconds.
allow_outgoing_trafficFlow control flag.
allow_incoming_trafficFlow control flag.

Note: All callbacks initiated by the agent are executed in the context of the thread that calls this function. The thread calling this function is also the only one that performs actual data transfer.

Note: The timeout value is subject to system limits as defined for the select function.

Note: In the typical usage scenario this function should be called in a tight loop.

◆ hard_close() [1/2]

result yami::core::agent::hard_close ( channel_descriptor  cd)

Immediately closes the given channel.

Closes the channel identified by the given descriptor.

The channel is closed immediately and those messages that are waiting in its outgoing queue are abandoned. Integrity of the message that was already partly transmitted is not guaranteed.

Parameters
cdDescriptor of the channel that should be closed.
Returns
  • ok if the operation was successful or the appropriate problem description

◆ hard_close() [2/2]

result yami::core::agent::hard_close ( const char *  target)

Immediately closes the given channel.

Closes the channel identified by the target name.

See the other hard_close function for the description.

◆ init() [1/2]

result yami::core::agent::init ( const parameters configuration_options,
incoming_message_dispatch_function  dispatch_callback,
void *  dispatch_hint,
closed_connection_function  disconnection_hook = NULL,
void *  disconnection_hook_hint = NULL,
allocator alloc = NULL 
)

Initialization.

Initializes the agent object to the operational state with runtime options that can override default settings.

See the other init function for the description of paramters.

◆ init() [2/2]

result yami::core::agent::init ( incoming_message_dispatch_function  dispatch_callback,
void *  dispatch_hint,
closed_connection_function  disconnection_hook = NULL,
void *  disconnection_hook_hint = NULL,
allocator alloc = NULL 
)

Initialization.

Initializes the agent object to the operational state with default values for runtime options.

Parameters
dispatch_callbackPointer to the user-defined function that will be called when the new incoming message is completed. This callback can be NULL.
dispatch_hintAny parameter that will be passed to the dispatch_callback function.
disconnection_hookPointer to the user-defined function that will be called when the existing connection is closed. The closing connection can be either the one that was explicitly created or the one that was automatically added by one of the listeners. In either case, the reason for closing the connection is passed to the callback as well. This callback can be NULL.
disconnection_hook_hintAny parameter that will be passed to the disconnection_hook function.
allocIf not NULL, provides a custom allocator to use. Otherwise the default (malloc/free) allocator is used.
Returns
  • ok if the operation was successful or the appropriate problem description

◆ install_event_notifications()

void yami::core::agent::install_event_notifications ( event_notification_function  event_notification_callback,
void *  event_notification_hint 
)

Installation of logging notifications callback.

Installs the logging monitor with the given hint. The previously installed callback (if any) is overriden.

Note: This function should be called after init. This function is not synchronized.

◆ install_frame_acceptor()

void yami::core::agent::install_frame_acceptor ( frame_acceptor_function  frame_acceptor,
void *  frame_acceptor_hint 
)

Installation of frame acceptor callback.

Installs the frame acceptor callback with the given hint. The previously installed acceptor (if any) is overriden. The acceptor callback is invoked for each incoming frame. If any exception is thrown from the acceptor callback, the frame in question will be dropped.

Note: This function should be called after init and before opening the channels that need this functionality. This function is not synchronized. The frame acceptor callback is called from the context where the given channel should not be tampered with.

◆ install_io_error_logger()

void yami::core::agent::install_io_error_logger ( io_error_function  io_error_callback,
void *  io_error_callback_hint 
)

Installation of I/O error logging callback.

Installs the I/O error logging callback with the given hint. The previously installed callback (if any) is overriden.

Note: This function should be called after init. This function is not synchronized.

◆ is_open()

result yami::core::agent::is_open ( const char *  target,
channel_descriptor existing_channel 
) const

Checks if the given channel is already open.

Checks if the given channel is already open.

Parameters
targetThe target name to check.
existing_channelThe descriptor that is filled so that it refers to the found channel, if it exists.
Returns
  • ok if the channel is already open
  • no_such_name if the channel does not exist
  • channel_closed if the channel is in the closing state

◆ open() [1/3]

result yami::core::agent::open ( const char *  target)

Creates new channel for the given target.

Create a new channel for the given target. If the channel already exists for the given target, this function does nothing.

The supported target formats are:

  • "tcp://host:port" for TCP/IP connections, where host can be provided in the symbolic or numeric form
  • "udp://host:port" for UDP communication, where host can be provided in the symbolic or numeric form
  • "unix://path" for Unix connections, where path can be relative or absolute
  • "file://filename" or "file://filename?write" for writing to regular files
  • "file://filename?read" for reading from regular files
  • "file://filename?append" for appending to regular files
  • "qnx://node:pid:chid" for QNX native messaging

If the library is compiled with OpenSSL support, the additional target format is:

  • "tcps://host:port" for SSL connections
Parameters
targetThe target for the new connection.
Returns
  • ok if the operation was successful or the appropriate problem description

◆ open() [2/3]

result yami::core::agent::open ( const char *  target,
channel_descriptor cd,
bool &  created_new_channel 
)

Create new channel for the given target.

Create a new channel for the given target or find the existing channel with the same target.

See the other open function for the description of valid target formats.

Parameters
targetThe target for the new connection.
cdThe descriptor that is filled so that it refers to the newly created or found channel.
created_new_channelset to true if a new channel was created.
Returns
  • ok if the operation was successful or the appropriate problem description

◆ open() [3/3]

result yami::core::agent::open ( const char *  target,
channel_descriptor cd,
bool &  created_new_channel,
const parameters overriding_options 
)

Create new channel for the given target with a set of overriding options.

If the new channel is created, it will use the overriding options instead of the ones used by the agent. See the other overload for description.

◆ post() [1/2]

result yami::core::agent::post ( channel_descriptor  cd,
const serializable message_header,
const serializable message_body,
std::size_t  priority = 0,
message_progress_function  progress_callback = NULL,
void *  progress_hint = NULL 
)

Posts new message for sending.

Posts a new message to the outgoing queue of the given channel.

The message is composed of two sets of parameters, one for the header information and one for the body. This distinction is supposed to support arbitrary routing conventions defined by user code. Any of these parts can be empty.

The priority of the message is taken into account for proper ordering of the frames in the outgoing queue - frames created for messages with higher priority will be transmitted before frames having lower priority. Messages with equal priority are ordered according to the FIFO regime.

The callback function can be provided to allow the user code trace the progress of the message. For each frame that was successfully pushed for physical transmission the callback is performed with the number of bytes that were transmitted from the beginning of the message and the total number of bytes for the whole message. When these two arguments are equal then it indicates that the whole message has been transmitted. If both are zero it means that there was an error and the message was abandoned.

Parameters
cdDescriptor of the channel that should be used for sending.
message_headerThe parameters object containing (arbitrary) header information - this object can be empty.
message_bodyThe parameters object containing (arbitrary) body information - this object can be empty.
priorityPriority of the request, respects existing frames in the outgoing queue.
progress_callbackPointer to the user-defined function that will be called for tracking transmission progress. This callback can be NULL.
progress_hintAny parameter that will be passed to the progress_callback function.

◆ post() [2/2]

result yami::core::agent::post ( const char *  target,
const serializable message_header,
const serializable message_body,
std::size_t  priority = 0,
message_progress_function  progress_callback = NULL,
void *  progress_hint = NULL 
)

Posts new message for sending.

Posts a new message to the outgoing queue of the given channel where the channel is identified by its target.

See the other post function for the description of arguments and their semantics.

◆ remove_listener()

result yami::core::agent::remove_listener ( const char *  target)

Removes existing listener.

Removes the listener denoted by its actual target name. Note that the actual target name might be different from the name provided when the listener was created, due to target resolution.

Parameters
targetTarget identifying the listener to remove. If the listener is not found, this function does nothing.
Returns
  • ok if the operation was successful or the appropriate problem description

The documentation for this class was generated from the following file: