Library for Agent job control on agent side (ta_job)

Overview

Library to be used by backends for TAPI job. More…

// typedefs

typedef struct ta_job_manager_t ta_job_manager_t;
typedef enum ta_job_status_type_t ta_job_status_type_t;
typedef struct ta_job_status_t ta_job_status_t;
typedef struct ta_job_buffer_t ta_job_buffer_t;
typedef enum ta_job_wrapper_priority_t ta_job_wrapper_priority_t;

// enums

enum ta_job_status_type_t;
enum ta_job_wrapper_priority_t;

// structs

struct ta_job_buffer_t;
struct ta_job_status_t;

// global functions

te_errno ta_job_manager_init(ta_job_manager_t** manager);
te_errno ta_job_create(ta_job_manager_t* manager, const char* spawner, const char* tool, char** argv, char** env, unsigned int* job_id);
te_errno ta_job_start(ta_job_manager_t* manager, unsigned int id);
te_errno ta_job_allocate_channels(ta_job_manager_t* manager, unsigned int job_id, bool input_channels, unsigned int n_channels, unsigned int* channels);
void ta_job_deallocate_channels(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channels);
te_errno ta_job_attach_filter(ta_job_manager_t* manager, const char* filter_name, unsigned int n_channels, unsigned int* channels, bool readable, te_log_level log_level, unsigned int* filter_id);
te_errno ta_job_filter_add_regexp(ta_job_manager_t* manager, unsigned int filter_id, char* re, unsigned int extract);
te_errno ta_job_filter_add_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels);
te_errno ta_job_filter_remove_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels);
te_errno ta_job_poll(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channel_ids, int timeout_ms, bool filter_only);
te_errno ta_job_receive(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer);
te_errno ta_job_receive_last(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer);
te_errno ta_job_receive_many(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t** buffers, unsigned int* count);
te_errno ta_job_clear(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters);
te_errno ta_job_send(ta_job_manager_t* manager, unsigned int channel_id, size_t count, uint8_t* buf);
te_errno ta_job_kill(ta_job_manager_t* manager, unsigned int job_id, int signo);
te_errno ta_job_killpg(ta_job_manager_t* manager, unsigned int job_id, int signo);
te_errno ta_job_wait(ta_job_manager_t* manager, unsigned int job_id, int timeout_ms, ta_job_status_t* status);
te_errno ta_job_stop(ta_job_manager_t* manager, unsigned int job_id, int signo, int term_timeout_ms);
te_errno ta_job_destroy(ta_job_manager_t* manager, unsigned int job_id, int term_timeout_ms);
te_errno ta_job_wrapper_add(ta_job_manager_t* manager, const char* tool, char** argv, unsigned int job_id, ta_job_wrapper_priority_t priority, unsigned int* wrapper_id);
te_errno ta_job_wrapper_delete(ta_job_manager_t* manager, unsigned int job_id, unsigned int wrapper_id);
te_errno ta_job_add_exec_param(ta_job_manager_t* manager, unsigned int job_id, te_exec_param* exec_params);

// macros

#define TA_JOB_BUFFER_INIT

Detailed Documentation

Library to be used by backends for TAPI job. The library provides types and functions that should be used by TAPI job backends for agent job management.

Typedefs

typedef struct ta_job_manager_t ta_job_manager_t

Job manager instance which represents a particular TAPI job backend

typedef enum ta_job_status_type_t ta_job_status_type_t

Cause of job’s completion

typedef struct ta_job_status_t ta_job_status_t

Structure that represents status of a completed job

typedef struct ta_job_buffer_t ta_job_buffer_t

A structure to store messages produced by the job

typedef enum ta_job_wrapper_priority_t ta_job_wrapper_priority_t

Wrappers priority level

Global Functions

te_errno ta_job_manager_init(ta_job_manager_t** manager)

Initialize a job manager. You must ensure that the function is called before any backend function which uses the job manager, i.e. the manager must not be used uninitialized.

Parameters:

manager

Job manager handle

Returns:

Status code

te_errno ta_job_create(ta_job_manager_t* manager, const char* spawner, const char* tool, char** argv, char** env, unsigned int* job_id)

Create a job

argv and env are owned by the function on success and must not be modified or freed by the caller.

Parameters:

manager

Job manager handle

spawner

Spawner plugin name (may be NULL for the default plugin)

tool

Program path to run

argv

Program arguments (last item is NULL)

env

Program environment (last item is NULL). May be NULL to keep the current environment.

job_id

ID of the created job

Returns:

Status code

te_errno ta_job_start(ta_job_manager_t* manager, unsigned int id)

Start a job

Parameters:

manager

Job manager handle

id

ID of the job to be started

Returns:

Status code

te_errno ta_job_allocate_channels(ta_job_manager_t* manager, unsigned int job_id, bool input_channels, unsigned int n_channels, unsigned int* channels)

Allocate n_channels channels. This function is supposed to be called only once for each job.

Memory for channels must be allocated before calling this function

Parameters:

manager

Job manager handle

job_id

ID of the job for which to allocate the channels

input_channels

true to allocate input channels, false to allocate output channels

n_channels

Number of channels to allocate

channels

Array of channels to be filled with ids of the allocated channels

Returns:

Status code

void ta_job_deallocate_channels(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channels)

Deallocate n_channels channels

Parameters:

manager

Job manager handle

n_channels

Number of channels to deallocate

channels

Array of channels to be deallocated

te_errno ta_job_attach_filter(ta_job_manager_t* manager, const char* filter_name, unsigned int n_channels, unsigned int* channels, bool readable, te_log_level log_level, unsigned int* filter_id)

Attach filter to specified output channels

Parameters:

manager

Job manager handle

filter_name

Name of the filter, may be NULL

n_channels

Number of channels to attach the filter to

channels

IDs of output channels to attach the filter to

readable

Whether or not it will be possible to get an output of the filter via ta_job_receive()

log_level

If non-zero, the output of the filter is logged with a given log level

filter_id

ID of the attached filter

Returns:

Status code

te_errno ta_job_filter_add_regexp(ta_job_manager_t* manager, unsigned int filter_id, char* re, unsigned int extract)

Add a regular expression to a filter. The filter will only store output that matches the regular expression.

Multi-segment matching is performed. Thus, PCRE_MULTILINE option is set.

Parameters:

manager

Job manager handle

filter_id

ID of the filter to attach the regex to

re

PCRE-style regular expression

extract

Index of a capturing group to extract from the matched output. 0 means to extract the whole match.

Returns:

Status code

te_errno ta_job_filter_add_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels)

Attach an existing filter to additional output channels

Parameters:

manager

Job manager handle

filter_id

ID of the filter to attach

n_channels

Number of channels to attach the filter to

channels

IDs of output channels to attach the filter to

Returns:

Status code

te_errno ta_job_filter_remove_channels(ta_job_manager_t* manager, unsigned int filter_id, unsigned int n_channels, unsigned int* channels)

Remove filter from specified output channels. For instance, if the filter is attached to stdout and stderr and the function is called with only stdout specified, the filter will continue working with stderr. Either the function succeeds and the filter is removed from all specified channels, or the function fails and the filter is not removed from any channel.

If the filter is removed from every channel it was attached to, all associated resources will be freed, so ta_job_receive() will fail

Parameters:

manager

Job manager handle

filter_id

ID of the filter to remove

n_channels

Number of channels to remove the filter from

channels

IDs of output channels to remove the filter from

Returns:

Status code

te_errno ta_job_poll(ta_job_manager_t* manager, unsigned int n_channels, unsigned int* channel_ids, int timeout_ms, bool filter_only)

Poll channels/filters for readiness. A filter is considered ready if it is readable and has some data that can be got via ta_job_receive(). A channel is considered ready if it is an input channel and it is ready to accept input.

Parameters:

manager

Job manager handle

n_channels

Number of channels to poll

channel_ids

IDs of channels to poll

timeout_ms

Timeout to wait for readiness

filter_only

true if only channels are provided. In this case, if one of ids is actually an id of a channel, not a filter, TE_EPERM will be returned.

0

One of the channels/filters is ready

TE_ETIMEDOUT

None of the channels/filters was ready within timeout_ms

Returns:

Status code

te_errno ta_job_receive(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer)

Receive the first message from one of the filters and remove it from the filter’s message queue.

Parameters:

manager

Job manager handle

n_filters

Number of filters to receive from

filters

ID of filters to receive from

timeout_ms

Time to wait for data to appear on some filter

buffer

Buffer to store the received data

Returns:

Status code

te_errno ta_job_receive_last(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t* buffer)

Receive the last non-eos message (if there is one) from one of the filters. If the only message contains eos, read it. The message is not removed from the filter’s message queue, it can still be read with ta_job_receive().

Parameters:

manager

Job manager handle

n_filters

Number of filters to receive from

filters

ID of filters to receive from

timeout_ms

Time to wait for data to appear on some filter

buffer

Buffer to store the received data

Returns:

Status code

te_errno ta_job_receive_many(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters, int timeout_ms, ta_job_buffer_t** buffers, unsigned int* count)

Receive multiple messages at once from the specified filters. This function may retrieve some messages and return error if attempt to read the next message failed.

Parameters:

manager

Job manager handle

n_filters

Number of filters to receive from

filters

ID of filters to receive from

timeout_ms

Time to wait for data to appear on some filter

buffers

Where to save pointer to array of buffers with messages

count

On input, maximum number of messages to retrieve. If zero, all available messages will be retrieved. On output - number of actually retrieved messages.

Returns:

Status code

te_errno ta_job_clear(ta_job_manager_t* manager, unsigned int n_filters, unsigned int* filters)

Remove all messages from the filters’ message queues

Parameters:

manager

Job manager handle

n_filters

Number of filters to be cleared

filters

IDs of filters to be cleared

Returns:

Status code

te_errno ta_job_send(ta_job_manager_t* manager, unsigned int channel_id, size_t count, uint8_t* buf)

Send data to a job through an input channel

Parameters:

manager

Job manager handle

channel_id

ID of the input channel

count

Number of bytes to be sent

buf

Data to be sent

Returns:

Status code

te_errno ta_job_kill(ta_job_manager_t* manager, unsigned int job_id, int signo)

Send a signal to a job

Parameters:

manager

Job manager handle

job_id

ID of the job to send signal to

signo

Number of the signal to send

Returns:

Status code

te_errno ta_job_killpg(ta_job_manager_t* manager, unsigned int job_id, int signo)

Send a signal to a job’s process group

Parameters:

manager

Job manager handle

job_id

ID of the job to send signal to

signo

Number of the signal to send

Returns:

Status code

te_errno ta_job_wait(ta_job_manager_t* manager, unsigned int job_id, int timeout_ms, ta_job_status_t* status)

Wait for a job completion (or check its status if timeout_ms is zero)

Parameters:

manager

Job manager handle

job_id

ID of the job to wait for

timeout_ms

Time to wait for the job. 0 means to check current status and exit, negative value means that the call of the function blocks until the job changes its status.

status

Job exit status location, may be NULL

0

The job completed running

TE_EINPROGRESS

The job is still running

TE_ECHILD

The job was never started

Returns:

Status code

te_errno ta_job_stop(ta_job_manager_t* manager, unsigned int job_id, int signo, int term_timeout_ms)

Stop a job. It can be started over with ta_job_start(). The function tries to terminate the job with the specified signal. If the signal fails to terminate the job withing term_timeout_ms, the function will send SIGKILL.

Parameters:

manager

Job manager handle

job_id

ID of the job to stop

signo

Number of the signal to send. If signo is SIGKILL, it will be sent only once.

term_timeout_ms

The timeout of graceful termination of a job, if it has been running. After the timeout expiration the job will be killed with SIGKILL. Negative means default timeout.

Returns:

Status code

te_errno ta_job_destroy(ta_job_manager_t* manager, unsigned int job_id, int term_timeout_ms)

Destroy a job instance. If the job has started, it is terminated as gracefully as possible. All resources of the instance are freed, all unread data on all filters are lost.

Parameters:

manager

Job manager handle

job_id

ID of the job to destroy

term_timeout_ms

The timeout of graceful termination of a job, if it has been running. After the timeout expiration the job will be killed with SIGKILL. Negative means default timeout.

Returns:

Status code

te_errno ta_job_wrapper_add(ta_job_manager_t* manager, const char* tool, char** argv, unsigned int job_id, ta_job_wrapper_priority_t priority, unsigned int* wrapper_id)

Add a wrapper for a job that is not running

The wrapper must be added after the job is created

The function can be called several times. Wrappers will be added to the main tool from right to left according to the priority levels.

Parameters:

manager

Job manager handle

tool

Path to the wrapper tool

argv

Wrapper arguments (last item must be NULL)

job_id

ID of the job to add the wrapper to

priority

Wrapper priority

wrapper_id

ID of the added wrapper

Returns:

Status code

te_errno ta_job_wrapper_delete(ta_job_manager_t* manager, unsigned int job_id, unsigned int wrapper_id)

Delete a wrapper

There is no necessity to delete wrappers before job destruction. All wrappers are removed automatically together with the job.

Parameters:

manager

Job manager handle

job_id

ID of a job to which the wrapper belongs

wrapper_id

ID of a wrapper to delete

Returns:

Status code

te_errno ta_job_add_exec_param(ta_job_manager_t* manager, unsigned int job_id, te_exec_param* exec_params)

Add a process parameter for a job

Parameters:

manager

Job manager handle

job_id

ID of the job to add the parameters to

exec_params

Array of process parameters. The last element must have the type set to TE_EXEC_END and the data set to NULL.

Returns:

Status code

Macros

#define TA_JOB_BUFFER_INIT

Default initializer for ta_job_buffer_t