Library for Agent job control on agent side (ta_job)
Overview
Library to be used by backends for TAPI job. More…
// typedefs typedef struct ta_job_manager_t ta_job_manager_t; typedef enum ta_job_status_type_t ta_job_status_type_t; typedef struct ta_job_status_t ta_job_status_t; typedef struct ta_job_buffer_t ta_job_buffer_t; typedef enum ta_job_wrapper_priority_t ta_job_wrapper_priority_t; // enums enum ta_job_status_type_t; enum ta_job_wrapper_priority_t; // structs struct ta_job_buffer_t; struct ta_job_status_t; // global functions te_errno ta_job_manager_init(ta_job_manager_t** manager); te_errno ta_job_create(ta_job_manager_t* manager, const char* spawner, const char* tool, char** argv, char** env, unsigned int* job_id); te_errno ta_job_start(ta_job_manager_t* manager, unsigned int id); te_errno ta_job_allocate_channels(ta_job_manager_t* manager, unsigned int job_id, bool input_channels, unsigned int n_channels, unsigned int* channels); void ta_job_deallocate_channels(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channels); te_errno ta_job_attach_filter(ta_job_manager_t* manager, const char* filter_name, unsigned int n_channels, unsigned int* channels, bool readable, te_log_level log_level, unsigned int* filter_id); te_errno ta_job_filter_add_regexp(ta_job_manager_t* manager, unsigned int filter_id, char* re, unsigned int extract); te_errno ta_job_filter_add_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels); te_errno ta_job_filter_remove_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels); te_errno ta_job_poll(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channel_ids, int timeout_ms, bool filter_only); te_errno ta_job_receive(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer); te_errno ta_job_receive_last(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer); te_errno ta_job_receive_many(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t** buffers, unsigned int* count); te_errno ta_job_clear(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters); te_errno ta_job_send(ta_job_manager_t* manager, unsigned int channel_id, size_t count, uint8_t* buf); te_errno ta_job_kill(ta_job_manager_t* manager, unsigned int job_id, int signo); te_errno ta_job_killpg(ta_job_manager_t* manager, unsigned int job_id, int signo); te_errno ta_job_wait(ta_job_manager_t* manager, unsigned int job_id, int timeout_ms, ta_job_status_t* status); te_errno ta_job_stop(ta_job_manager_t* manager, unsigned int job_id, int signo, int term_timeout_ms); te_errno ta_job_destroy(ta_job_manager_t* manager, unsigned int job_id, int term_timeout_ms); te_errno ta_job_wrapper_add(ta_job_manager_t* manager, const char* tool, char** argv, unsigned int job_id, ta_job_wrapper_priority_t priority, unsigned int* wrapper_id); te_errno ta_job_wrapper_delete(ta_job_manager_t* manager, unsigned int job_id, unsigned int wrapper_id); te_errno ta_job_add_exec_param(ta_job_manager_t* manager, unsigned int job_id, te_exec_param* exec_params); // macros #define TA_JOB_BUFFER_INIT
Detailed Documentation
Library to be used by backends for TAPI job. The library provides types and functions that should be used by TAPI job backends for agent job management.
Typedefs
typedef struct ta_job_manager_t ta_job_manager_t
Job manager instance which represents a particular TAPI job backend
typedef enum ta_job_status_type_t ta_job_status_type_t
Cause of job’s completion
typedef struct ta_job_status_t ta_job_status_t
Structure that represents status of a completed job
typedef struct ta_job_buffer_t ta_job_buffer_t
A structure to store messages produced by the job
typedef enum ta_job_wrapper_priority_t ta_job_wrapper_priority_t
Wrappers priority level
Global Functions
te_errno ta_job_manager_init(ta_job_manager_t** manager)
Initialize a job manager. You must ensure that the function is called before any backend function which uses the job manager, i.e. the manager must not be used uninitialized.
Parameters:
manager |
Job manager handle |
Returns:
Status code
te_errno ta_job_create(ta_job_manager_t* manager, const char* spawner, const char* tool, char** argv, char** env, unsigned int* job_id)
Create a job
argv
and env
are owned by the function on success and must not be modified or freed by the caller.
Parameters:
manager |
Job manager handle |
spawner |
Spawner plugin name (may be |
tool |
Program path to run |
argv |
Program arguments (last item is |
env |
Program environment (last item is |
job_id |
ID of the created job |
Returns:
Status code
te_errno ta_job_start(ta_job_manager_t* manager, unsigned int id)
Start a job
Parameters:
manager |
Job manager handle |
id |
ID of the job to be started |
Returns:
Status code
te_errno ta_job_allocate_channels(ta_job_manager_t* manager, unsigned int job_id, bool input_channels, unsigned int n_channels, unsigned int* channels)
Allocate n_channels
channels. This function is supposed to be called only once for each job.
Memory for channels
must be allocated before calling this function
Parameters:
manager |
Job manager handle |
job_id |
ID of the job for which to allocate the channels |
input_channels |
|
n_channels |
Number of channels to allocate |
channels |
Array of channels to be filled with ids of the allocated channels |
Returns:
Status code
void ta_job_deallocate_channels(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channels)
Deallocate n_channels
channels
Parameters:
manager |
Job manager handle |
n_channels |
Number of channels to deallocate |
channels |
Array of channels to be deallocated |
te_errno ta_job_attach_filter(ta_job_manager_t* manager, const char* filter_name, unsigned int n_channels, unsigned int* channels, bool readable, te_log_level log_level, unsigned int* filter_id)
Attach filter to specified output channels
Parameters:
manager |
Job manager handle |
filter_name |
Name of the filter, may be |
n_channels |
Number of channels to attach the filter to |
channels |
IDs of output channels to attach the filter to |
readable |
Whether or not it will be possible to get an output of the filter via ta_job_receive() |
log_level |
If non-zero, the output of the filter is logged with a given log level |
filter_id |
ID of the attached filter |
Returns:
Status code
te_errno ta_job_filter_add_regexp(ta_job_manager_t* manager, unsigned int filter_id, char* re, unsigned int extract)
Add a regular expression to a filter. The filter will only store output that matches the regular expression.
Multi-segment matching is performed. Thus, PCRE_MULTILINE option is set.
Parameters:
manager |
Job manager handle |
filter_id |
ID of the filter to attach the regex to |
re |
PCRE-style regular expression |
extract |
Index of a capturing group to extract from the matched output. |
Returns:
Status code
te_errno ta_job_filter_add_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels)
Attach an existing filter to additional output channels
Parameters:
manager |
Job manager handle |
filter_id |
ID of the filter to attach |
n_channels |
Number of channels to attach the filter to |
channels |
IDs of output channels to attach the filter to |
Returns:
Status code
te_errno ta_job_filter_remove_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels)
Remove filter from specified output channels. For instance, if the filter is attached to stdout and stderr and the function is called with only stdout specified, the filter will continue working with stderr. Either the function succeeds and the filter is removed from all specified channels, or the function fails and the filter is not removed from any channel.
If the filter is removed from every channel it was attached to, all associated resources will be freed, so ta_job_receive() will fail
Parameters:
manager |
Job manager handle |
filter_id |
ID of the filter to remove |
n_channels |
Number of channels to remove the filter from |
channels |
IDs of output channels to remove the filter from |
Returns:
Status code
te_errno ta_job_poll(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channel_ids, int timeout_ms, bool filter_only)
Poll channels/filters for readiness. A filter is considered ready if it is readable and has some data that can be got via ta_job_receive(). A channel is considered ready if it is an input channel and it is ready to accept input.
Parameters:
manager |
Job manager handle |
n_channels |
Number of channels to poll |
channel_ids |
IDs of channels to poll |
timeout_ms |
Timeout to wait for readiness |
filter_only |
|
0 |
One of the channels/filters is ready |
TE_ETIMEDOUT |
None of the channels/filters was ready within |
Returns:
Status code
te_errno ta_job_receive(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer)
Receive the first message from one of the filters and remove it from the filter’s message queue.
Parameters:
manager |
Job manager handle |
n_filters |
Number of filters to receive from |
filters |
ID of filters to receive from |
timeout_ms |
Time to wait for data to appear on some filter |
buffer |
Buffer to store the received data |
Returns:
Status code
te_errno ta_job_receive_last(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer)
Receive the last non-eos message (if there is one) from one of the filters. If the only message contains eos, read it. The message is not removed from the filter’s message queue, it can still be read with ta_job_receive().
Parameters:
manager |
Job manager handle |
n_filters |
Number of filters to receive from |
filters |
ID of filters to receive from |
timeout_ms |
Time to wait for data to appear on some filter |
buffer |
Buffer to store the received data |
Returns:
Status code
te_errno ta_job_receive_many(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t** buffers, unsigned int* count)
Receive multiple messages at once from the specified filters. This function may retrieve some messages and return error if attempt to read the next message failed.
Parameters:
manager |
Job manager handle |
n_filters |
Number of filters to receive from |
filters |
ID of filters to receive from |
timeout_ms |
Time to wait for data to appear on some filter |
buffers |
Where to save pointer to array of buffers with messages |
count |
On input, maximum number of messages to retrieve. If zero, all available messages will be retrieved. On output - number of actually retrieved messages. |
Returns:
Status code
te_errno ta_job_clear(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters)
Remove all messages from the filters’ message queues
Parameters:
manager |
Job manager handle |
n_filters |
Number of filters to be cleared |
filters |
IDs of filters to be cleared |
Returns:
Status code
te_errno ta_job_send(ta_job_manager_t* manager, unsigned int channel_id, size_t count, uint8_t* buf)
Send data to a job through an input channel
Parameters:
manager |
Job manager handle |
channel_id |
ID of the input channel |
count |
Number of bytes to be sent |
buf |
Data to be sent |
Returns:
Status code
te_errno ta_job_kill(ta_job_manager_t* manager, unsigned int job_id, int signo)
Send a signal to a job
Parameters:
manager |
Job manager handle |
job_id |
ID of the job to send signal to |
signo |
Number of the signal to send |
Returns:
Status code
te_errno ta_job_killpg(ta_job_manager_t* manager, unsigned int job_id, int signo)
Send a signal to a job’s process group
Parameters:
manager |
Job manager handle |
job_id |
ID of the job to send signal to |
signo |
Number of the signal to send |
Returns:
Status code
te_errno ta_job_wait(ta_job_manager_t* manager, unsigned int job_id, int timeout_ms, ta_job_status_t* status)
Wait for a job completion (or check its status if timeout_ms
is zero)
Parameters:
manager |
Job manager handle |
job_id |
ID of the job to wait for |
timeout_ms |
Time to wait for the job. |
status |
Job exit status location, may be |
0 |
The job completed running |
TE_EINPROGRESS |
The job is still running |
TE_ECHILD |
The job was never started |
Returns:
Status code
te_errno ta_job_stop(ta_job_manager_t* manager, unsigned int job_id, int signo, int term_timeout_ms)
Stop a job. It can be started over with ta_job_start(). The function tries to terminate the job with the specified signal. If the signal fails to terminate the job withing term_timeout_ms
, the function will send SIGKILL
.
Parameters:
manager |
Job manager handle |
job_id |
ID of the job to stop |
signo |
Number of the signal to send. If |
term_timeout_ms |
The timeout of graceful termination of a job, if it has been running. After the timeout expiration the job will be killed with |
Returns:
Status code
te_errno ta_job_destroy(ta_job_manager_t* manager, unsigned int job_id, int term_timeout_ms)
Destroy a job instance. If the job has started, it is terminated as gracefully as possible. All resources of the instance are freed, all unread data on all filters are lost.
Parameters:
manager |
Job manager handle |
job_id |
ID of the job to destroy |
term_timeout_ms |
The timeout of graceful termination of a job, if it has been running. After the timeout expiration the job will be killed with |
Returns:
Status code
te_errno ta_job_wrapper_add(ta_job_manager_t* manager, const char* tool, char** argv, unsigned int job_id, ta_job_wrapper_priority_t priority, unsigned int* wrapper_id)
Add a wrapper for a job that is not running
The wrapper must be added after the job is created
The function can be called several times. Wrappers will be added to the main tool from right to left according to the priority levels.
Parameters:
manager |
Job manager handle |
tool |
Path to the wrapper tool |
argv |
Wrapper arguments (last item must be |
job_id |
ID of the job to add the wrapper to |
priority |
Wrapper priority |
wrapper_id |
ID of the added wrapper |
Returns:
Status code
te_errno ta_job_wrapper_delete(ta_job_manager_t* manager, unsigned int job_id, unsigned int wrapper_id)
Delete a wrapper
There is no necessity to delete wrappers before job destruction. All wrappers are removed automatically together with the job.
Parameters:
manager |
Job manager handle |
job_id |
ID of a job to which the wrapper belongs |
wrapper_id |
ID of a wrapper to delete |
Returns:
Status code
te_errno ta_job_add_exec_param(ta_job_manager_t* manager, unsigned int job_id, te_exec_param* exec_params)
Add a process parameter for a job
Parameters:
manager |
Job manager handle |
job_id |
ID of the job to add the parameters to |
exec_params |
Array of process parameters. The last element must have the type set to TE_EXEC_END and the data set to |
Returns:
Status code
Macros
#define TA_JOB_BUFFER_INIT
Default initializer for ta_job_buffer_t