Task Executor
The Task Executor allows you to customize the processing of tasks, such as software updates and remote configuration change.
Tasks are processed based on scripts written in the Lua language. Their execution is triggered asynchronously, after receiving a REST request.
The Task Executor can be seamlessly integrated with the Gateway Agent's message bus (northbound and southbound interfaces).
For a step-by-step guide on how on how to install the Task Executor, see the Installation Guide.
Configuring the Task Executor
The deployment configuration file of the Task Executor is located at:
/etc/relayr/gwa-task-executor-cpp/gwa-task-executor-config.json
To modify the file, edit it in a text editor.
The file contains the following configuration settings:
mqtt_connection
:
Configuration Option | Description | Default Value |
---|---|---|
host | Hostname or IP address of MQTT broker. | 127.0.0.1 |
port | Port number for connecting to MQTT broker. | 1883 |
keepalive | MQTT connection keep alive interval. | 20 |
max_inflight_messages | The maximum number of QoS 1 or 2 messages that can be in the process of being transmitted simultaneously. | 100 |
qos_levels | Quality of Service level for each traffic type. | measurements: 0 , alerts: 1 . configuration: 1 , metadata: 1 |
http_connection
:
Configuration Option | Description | Default Value |
---|---|---|
host | Hostname or IP address of the Task Executor's HTTP interface. | 127.0.0.1 |
port | Port number of the Task Executor interface. | 22080 |
use_ssl | Determines if the connection to the Task Executor's interface requires SSL connection. | false |
ca_file | Path to SSL Certificate Authorities file. | /etc/ssl/certs/ca-certificates.crt |
auth_enable | Determines if the Task Executor's HTTP interface requires authorization. | false |
username | Username to the Task Executor's HTTP interface. | "" |
password | Password to the Task Executor's HTTP interface. | "" |
http_timeout | Timeout of the Task Executor's HTTP request, in seconds. If missing, the default value of 10 seconds is assumed. | 10 |
health_monitor
:
Configuration Option | Description | Default Value |
---|---|---|
heartbeat_interval | Time interval at which the periodic heartbeat messages are sent, given in seconds. For details, see the Edge Platform Monitoring section. | 60 |
engine
- Settings related to task scripts and their execution:
Configuration Option | Description | Default Value |
---|---|---|
manifest_file | Path to the file which contains a set of scripts that define the task execution logic. For more information, see the Task Manifest File section. | task-manifest.json |
executor_pool_size | Number of worker threads which execute scripts. | 5 |
task_timeout | Number of hours after which an uncompleted task is timed out. If set to 0 , the task timeout is disabled. | 0 |
task_timeout_check_period | Time interval, given in minutes, at which the Task Executor checks if there are any uncompleted tasks older that the task_timeout value. | |
task_log_severity | Severity of a built-in logger of task scripts. One of: TRACE , DEBUG , INFO , WARNING , ERROR , CRITICAL . You can overwrite this global setting by a task-specific severity defined in the task manifest file. When you change this setting, you must also change the global logging level of the Task Executor accordingly. For more information, see the Adding Debug Mode to Task Scripts section. | INFO |
database
:
Configuration Option | Description | Default Value |
---|---|---|
path | Location of the Task Executor database. To store tasks in RAM, set its value to ":memory:" . | ./task-executor.db |
max_task_entries | Maximum number of task entries stored in the tasks table. If the task count reaches this limit, the oldest tasks are removed. | 10000 |
When you edit the configuration file, restart the Task Executor to apply your changes:
systemctl restart gwa-task-executor-cpp
For an example of the Task Executor configuration file, see here.
Creating Task Scripts
Scripts written in Lua define the logic of processing tasks. A list of scripts executed by the Task Executor are included in the task manifest file.
To include a script in the Task Executor, follow these steps:
Create a
.lua
file defining the logic of task processing.In the task manifest file, include a path to the
.lua
script file.
See sections below for information on:
Task Manifest File - Explanation and example on how to create a task manifest file.
Supported Lua Functions - List of high level functions implemented by the Task Executor to streamline the script creation.
Examples of Task Scripts - Examples of scripts and their description.
Task Manifest File
The task manifest file is a .json
file that contains a set of task scripts executed by the Task Executor.
The default file name is task-manifest.json
, but you can configure a different path to the file in the manifest_file
setting of the Task Executor configuration file.
The task manifest file has the following structure:
{
"<unique-task-name-1>": {
"file": "<path-to-lua-file-1>"
},
"<unique-task-name-2>": {
"file": "<path-to-lua-file-2>"
}
}
For each task script, add the following information about it to the task manifest file:
- Task name - User-defined string which uniquely identifies the task, e.g.
pkg-task
.
Task names provided in the task manifest file must be mapped to task types supported in a given project. The Task Executor executes a task of a given type when it receives a REST request referring to the corresponding task type.
- Task body - The
file
field with a path to the task body file, e.g.examples/pkg-mgr.lua
.
Optionally, you can also define the following settings for a task script:
task_log_severity
- Severity of the built-in logger of task scripts. One of:TRACE
,DEBUG
,INFO
,WARNING
,ERROR
,CRITICAL
. For more information, see the Adding Debug Mode to Task Scripts section.
If defined for a script, the
task_log_severity
overwrites a global setting configured in theengine
section of the configuration file.
resumable
- if set totrue
, a task can be resumed after the component's restart. When the Task Executor starts, theresume()
function is called for any instance of that task present in the database with thefinished
flag set tofalse
. See here for an example.
Example:
{
"package-task": {
"file": "examples/pkg-mgr.lua",
"task_log_severity": "ERROR"
},
"config-task": {
"file": "examples/task-forward.lua",
"resumable": true
},
"installation-task": {
"file": "examples/fota-task.lua",
"task_log_severity": "TRACE"
}
}
Supported Lua Functions
The task body is executed by an embedded Lua interpreter v5.3. Additionally, each task has global utility variables and functions:
Name | Description |
---|---|
GWA_JSON_NULL | Use this to set null in JSONs generated by the Lua code of the task script. |
gwaData | Variable initialized when a task execution is triggered by a REST request. Contains the task details. |
gwaDeviceId | Variable initialized when a task execution is triggered by a REST request. Contains the device ID. |
gwaPublishTaskUpdate(string deviceId, string id, string type, boolean finished, table details) | Publishes the task update data. |
gwaPublishTask(did, id, taskType, gwaData) | Submits a task for execution, e.g. forwards the task details to the southbound interface. |
gwaDownloadFile(string url, string identifier, string md5, string auth, integer retries) | Tries to download a file to the local storage. Returns a pair of a boolean result and a string with an error message (nil in case of success). |
gwaRemoveFile(string identifier) | Removes a file from the local storage. |
gwaExtractFromTopic(integer position) | Extracts a token at the n-th position from a topic. |
gwaGetTimestamp() | Returns a current number of milliseconds since the Unix epoch. |
gwaLogTrace(string logEntry) | Logs a message with the TRACE severity. |
gwaLogDebug(string logEntry) | Logs a message with the DEBUG severity. |
gwaLogInfo(string logEntry) | Logs a message with the INFO severity. |
gwaLogWarning(string logEntry) | Logs a message with the WARNING severity. |
gwaLogError(string logEntry) | Logs a message with the ERROR severity. |
gwaLogCritical(string logEntry) | Logs a message with the CRITICAL severity. |
Adding Debug Mode to Task Scripts
The Task Executor allows you to add logs with various severity levels to task script.
To log information about the task:
In the
.lua
script, define which information to log, using thegwaLog...
functions . For a list of available functions, see the Supported Lua Functions section.Configure a
task_log_severity
. Available options:TRACE
,DEBUG
,INFO
,WARNING
,ERROR
,CRITICAL
. You can define both a globaltask_log_severity
in theengine
section of the configuration file and a log severity for individual task scripts in the task manifest file.
A
task_log_severity
defined for an individual script overwrites the globaltask_log_severity
setting.
You can find task script logs in the Task Executor logs.
The
task_log_severity
setting, which defines the logging level for the Lua logger, must be aligned with the logging level for the whole Task Executor (see here for information on how to change it). These two settings are two different hierarchy levels of logging information, where the Lua logger is a child logger of the global Task Executor logger.
Example: For a message with theDEBUG
severity to show in the logs, the following chain of actions must take place:
1. Lua logs a message at theDEBUG
level.
2. Lua logger's verbosity level is at leastDEBUG
(task_log_severity
setting in the configuration file).
3. The message is forwarded to the root logger of the whole Task Executor.
4. The Task Executor's service verbosity is at leastDEBUG
.
5. The message is written to the logs.
Importing External Lua Libraries and Custom Helper Functions
The Task Executor allows you to import external Lua libraries and custom helper functions.
You can add information about external modules to the Lua interpreter in three different ways, described in the sections below.
For more information on importing external Lua modules, see the Lua documentation:
- sections aboutpackage.path
andpackage.cpath
- section about therequire
function
Method 1 - External Code in the Standard Location
To use external Lua libraries or custom helper functions, follow these steps:
- Place your external code in the standard location of a given platform.
To find out what the standard location on a given platform is, use the interpreter to print the default content of
package.path/package.cpath
, e.g. using:
lua5.3 -e "print(package.path)"
- In your task script code, use the
require
function to specify what external modules you want to use, e.g.:
require(“test“)
Method 2 - Modifying Environmental Variables
To use external Lua libraries or custom helper functions, follow these steps:
Place your external code in a custom location.
Modify the environmental variables
LUA_PATH
andLUA_CPATH
.
In this method, you must modify the environment of the Task Executor's process in which task scripts are executed to ensure the variables point to the proper location.
- In your task script code, use the
require
function to specify what external modules you want to use, e.g.:
require(“test“)
Method 3 - Setting Paths in a Task Script File
In this method, you must set the proper paths manually in every task script file.
To use external Lua libraries or custom helper functions, follow these steps:
Place your external code in a custom location.
In a task script file, set the
package.path/package.cpath
, e.g.:
package.path = “/[folder]/?.lua“
You must modify the
package.path
before you use therequire
function. Otherwise, your external module will not be imported.
- In a task script file, use the
require
function to specify what external modules you want to use, e.g.:
require(“test“)
Examples of Task Scripts
This section describes the following examples of .lua
task scripts:
Package update task script - Handling tasks that aim to update the selected packages of the Gateway Agent's components. The input to the script is a package task and the output is the task update.
Task forwarding script - Handling configuration update tasks (changing the configuration of BACnet devices remotely). The script enables forwarding tasks to the southbound interface and then forwarding the task update from the southbound interface to the northbound one.
Firmware upgrade task script - Handling tasks that aim to upgrade the firmware of Pico devices. The script enables downloading update files to the local storage and forwarding them to the southbound interface. This allows Pico devices to download the file, perform the firmware upgrade and send the upgrade result, which is then forwarded to the northbound interface.
After the installation, you can find more script examples at:
/usr/share/relayr/gwa-task-executor-cpp/examples
Package Update Task Script
This section describes an example of a package update task script, which aims to update the selected packages of the Gateway Agent's components. The input to the script is a package task and the output is the task update.
Script
local auth = true
local uname = "<fillme>"
local password = "<fillme>"
local shouldRemove = false
function task()
print("Received REST message:")
did = gwaDeviceId
id = gwaData["id"]
taskType = gwaData["type"]
finished = gwaData["finished"]
details = gwaData["details"]
name = details["name"]
parameters = details["parameters"]
for k, v in pairs(parameters) do
print(string.format(" %s = %s (%s)", k, v, type(v)))
if v["id"] == "url" then
url = v["value"]
end
if v["id"] == "file_name" then
fileName = v["value"]
end
end
-- download package
local wgetRet = nil
if auth ~= true then
wgetRet = os.execute("wget " .. url)
else
wgetRet = os.execute(string.format("wget --user %s --password %s %s", \
uname, password, url))
end
-- install package
local dpkgRet = os.execute("dpkg -i " .. fileName)
if wgetRet then
print("Download success")
end
if dpkgRet then
print("Installation success")
else
print("Installation failed")
end
if dpkgRet == nil then
print("Tries to satisfy dependencies")
local aptRet = os.execute("apt --yes --fix-broken install")
if aptRet then
dpkgRet = true
end
end
print(wgetRet, dpkgRet, aptRet)
-- send task update
details["ts"] = os.time(os.date("!*t"))
if wgetRet and dpkgRet then
details["status"] = "success"
details["message"] = "Upgrade was successful"
else
details["status"] = "error"
details["message"] = "Upgrade failed"
end
gwaPublishTaskUpdate(did, id, taskType, true, details)
-- remove downloaded package
if shouldRemove then
print("Removing downloaded package...")
os.execute("rm -rf " .. fileName)
end
end
Description
While processing the package update task, the Task Executor performs the following actions:
- Initiates the
task()
function when a REST request with a package update task is received.
Based on the task type (
taskType
) received in the REST request, the Task Executor detects which script should be triggered, as defined in the Task Manifest File.
Displays information on the received REST message and maps it to variables:
gwaDeviceId
- Containing the device ID.gwaData
- Containing information about the task (ID, type, information if the task is finished, task details).details
- Containing all additional information, such as the command name (name
) or the task parameters (parameters
), displayed as key/value pairs.
The structure of the details field reflects the structure of a task in a given project.
Detects the package file name (
file_name
) and the URL (url
) to the package, based on the key (id
) / value (value
) pairs included inparameters
.If authentication is enabled (
auth = true
), the username (uname
) andpassword
need to be provided.Downloads the selected package.
Starts the installation process, trying to satisfy dependencies, if needed, and notifies about the installation result.
Publishes the task update (
gwaPublishTaskUpdate(did, id, taskType, true, details)
) with the following information:did
- Device ID, string.id
- Task ID, string.taskType
- Task type, string.true
- Information that the task has finished.details
- All additional information, including the timestamp and taskstatus
andmessage
, which depend on the installation result (success
orerror
).
Deletes the downloaded package after the installation if
shouldRemove
is set totrue
.
Task Forwarding Script
This section describes an example for handling configuration update tasks (changing the configuration of BACnet devices remotely). The script publishes tasks to the southbound interface and then forwards the task update from the southbound interface to the northbound one.
A recommended way of handling register value changes is by using configurable business parameters. To do so, implement a Rule Engine's rule that detects if the business parameters have changed and generates a configuration task, which is then handled by the Task Executor. See here for a rule example.
Remember to include the same task name in the task manifest file as you define in the Rule Engine's rule that generates configuration tasks.
Script
function task()
print("Received REST message")
did = gwaDeviceId
id = gwaData["id"]
taskType = gwaData["type"]
finished = gwaData["finished"]
details = gwaData["details"]
-- publish task to SB interface
gwaPublishTask(did, id, taskType, details)
end
function update()
print("Received MQTT message")
did = gwaDeviceId
id = gwaData["id"]
taskType = gwaData["type"]
finished = gwaData["finished"]
details = gwaData["details"]
details["ts"] = os.time(os.date("*t")) * 1000
-- forward task update from SB to NB
gwaPublishTaskUpdate(did, id, taskType, finished, details)
end
function timeout()
print("Task timeout")
did = gwaDeviceId
id = gwaData["id"]
taskType = gwaData["type"]
finished = true
details = gwaData["details"]
details["message"] = "Task timeout"
details["ts"] = os.time(os.date("*t")) * 1000
details["status"] = "error"
-- forward task update from SB to NB
gwaPublishTaskUpdate(did, id, taskType, finished, details)
end
function resume()
print("Task resumed")
did = gwaDeviceId
id = gwaData["id"]
taskType = gwaData["type"]
finished = gwaData["finished"]
details = gwaData["details"]
-- republish task to SB interface
gwaPublishTask(did, id, taskType, details)
end
Description
While processing this task, the Task Executor performs the following actions:
- Initiates the
task()
function when a REST request with a configuration update task is received.
Based on the task type (
taskType
) received in the REST request, the Task Executor detects which script should be triggered, as defined in the Task Manifest File.
Displays information about the received REST message and maps it to variables:
gwaDeviceId
- Containing the device ID.gwaData
- Containing information about the task (ID, type, information if the task is finished, task details).details
- Containing all additional information, such as the command name or the task parameters, displayed as key/value pairs.
The structure of the
details
field reflects the structure of a task in a given project.
Publishes the task to the southbound interface (
gwaPublishTask(did, id, taskType, gwaData)
).Receives the task update information on the southbound interface (
function update()
) and displays the task details, reported by the BACnet Protocol Adapter:did
- Device ID, string.id
- Task ID, string.taskType
- Task type, string.finished
- Information whether the task has finished.details
- All additional information.
Forwards the task update to the northbound interface (
gwaPublishTaskUpdate(did, id, taskType, finished, details)
).If the task is timed out, the
timeout()
function is called to forward information about the timeout to the northbound interface.If the task is marked as resumable in the task manifest file, the
resume()
function is called to republish the unfinished task to the southbound interface in case of the Task Executor's restart.
Firmware Upgrade Task Script
This section describes an example for handling firmware upgrade tasks for Pico devices. The script enables downloading an update file to the local storage and forwarding it to the southbound interface. This allows Pico devices to download the file, perform the firmware upgrade and send the upgrade result, which is then forwarded to the northbound interface.
On the southbound interface, firmware upgrade tasks are handled by the Pico Adapter, responsible for communication between the Gateway Agent and Pico devices.
Script
function task()
print("Received REST message")
local did = gwaDeviceId
local id = gwaData["id"]
local taskType = gwaData["type"]
local details = gwaData["details"]
local parameters = details["parameters"]
local fileUrl = nil
local fileIdentifier = nil
local fileHash = nil
local authentication = nil
for it, param in ipairs(parameters) do
if param["id"] == "url" then
fileUrl = param["value"]
elseif param["id"] == "file_name" then
fileIdentifier = param["value"]
elseif param["id"] == "signature" then
fileHash = param["value"]
elseif param["id"] == "authentication" then
authentication = param["value"]
end
end
if fileUrl == nil then
abort_task(did, id, taskType, details, "url parameter not found")
return
end
if fileIdentifier == nil then
abort_task(did, id, taskType, details, "file_name parameter not found")
return
end
if fileHash == nil then
abort_task(did, id, taskType, details, "signature parameter not found")
return
end
if authentication == nil then
authentication = ""
end
local status, message = gwaDownloadFile(fileUrl, fileIdentifier, fileHash, authentication , 5)
if (status) then
forward_task(did, id, taskType, details)
else
abort_task(did, id, taskType, details, message)
end
end
function forward_task(did, id, taskType, details)
-- publish task to SB interface
gwaPublishTask(did, id, taskType, details)
end
function abort_task(did, id, taskType, details, reason)
details["status"] = "error"
details["message"] = reason
gwaPublishTaskUpdate(did, id, taskType, true, details)
end
function update()
print("Received MQTT message")
did = gwaDeviceId
id = gwaData["id"]
taskType = gwaData["type"]
finished = gwaData["finished"]
details = gwaData["details"]
-- forward task update from SB to NB
gwaPublishTaskUpdate(did, id, taskType, finished, details)
end
Description
While processing this task, the Task Executor performs the following actions:
- Initiates the
task()
function when a REST request with a configuration update task is received.
Based on the task type (
taskType
) received in the REST request, the Task Executor detects which script should be triggered, as defined in the Task Manifest File.
Displays information about the received REST message and maps it to variables:
gwaDeviceId
- Containing the device ID.gwaData
- Containing information about the task (task ID, type, and details).details
- Containing all additional information, including the task parameters:file name
file URL
file signature
authentication
Downloads the package file to the local storage if all the required parameters are found (
gwaDownloadFile(fileUrl, fileIdentifier, fileHash, authentication , 5)
).Forwards the task to the southbound interface (
gwaPublishTask(did, id, taskType, details)
).Receives the task result on the southbound interface.
Forwards the task update to the northbound interface (
gwaPublishTaskUpdate(did, id, taskType, finished, details)
).
In case of an error, the task is aborted and the corresponding error message is displayed. The error task result is then forwarded to the northbound interface.
Example of a Command Payload from the relayr Cloud
Sending tasks from the relayr Cloud is handled by means of commands. When you send a command, the Cloud Adapter receives it from the device/{device_id}/commands
topic.
The Cloud Adapter can handle commands with any message payload. It doesn't require any specific payload format to receive a command and forward it to the Task Executor.
Here is an example of the message payload format for a package update command:
{
"id": "<relayr_cloud_command_id>",
"name": "package.gwa-pkg-v1.1",
"parameter": [
{ "id": "operation_id", "value": "111" }, // operation id
{ "id": "type", "value": "gwa-pkg" }, // package type
{ "id": "url", "value": "https://boo.com/pkg.tgz" }, // package URL
{ "id": "size", "value": 1048576 }, // package size
{ "id": "signature_type", "value": "sha-256" }, // signature type - optional, e.g. "sha-256", "sha-1"
{ "id": "signature", "value": ".." }, // signature - optional
{ "id": "file_name", "value": "gwa-pkg.tgz" } // destination file name - optional
],
"timestamp": "2018-03-06T07:52:48.030+0000"
}
Here is an example of the curl request to the relayr Cloud to send a package update command:
curl -H "Authorization: Bearer $TOKEN" "https://cloud.rlr-eu-stg.relayr.io/devices/0ad304ac-4b69-46b2-9062-1ed1a3c14d3d/commands" -H "Content-Type: application/json" -d '[{"name": "package.test", "parameter": [{"id": "operation_id", "value": "1ea"}, {"id":"type", "value": "deb"}, {"id":"url", "value":"http://10.10.6.116/1.deb"}, {"id": "signature", "value": "ba0ebc666d10e5bc8f3101642138258413e6cd897a46b9a83f4878acdf545078"}, {"id":"signature_type", "value":"sha256"}, {"id" : "file_name", "value": "gwa-osisoft-adapter-c" }]}]' | jq .
The response of a successfully sent command contains the identifier (
id
) assigned to the command by the relayr Cloud, as shown below.
"data": [
{
"id": "4501cb45-4133-46dc-8d9a-7f08dfea7cf1",
"name": "package.test",
"parameter": [
{
"id": "operation_id",
"value": "1ea"
},
{
"id": "type",
"value": "deb"
},
{
"id": "url",
"value": "http://10.10.6.116/1.deb"
},
{
"id": "signature",
"value": "ba0ebc666d10e5bc8f3101642138258413e6cd897a46b9a83f4878acdf545078"
},
{
"id": "signature_type",
"value": "sha256"
},
{
"id": "file_name",
"value": "gwa-osisoft-adapter-c"
}
],
"timestamp": "2020-03-27T06:45:02.276+0000"
}
]
}
For more information on sending commands from the relayr Cloud, see the MQTT documentation.
Configuration File Example
{
"mqtt_connection": {
"host": "127.0.0.1",
"port": 1883,
"keepalive": 20,
"max_inflight_messages": 100,
"qos_levels": {
"measurements": 0,
"alerts": 1,
"configuration": 1,
"metadata": 1
}
},
"http_connection": {
"host": "127.0.0.1",
"port": 22080,
"use_ssl": false,
"ca_file": "/etc/ssl/certs/ca-certificates.crt",
"auth_enable": false,
"username": "",
"password": "",
"http_timeout": 10
},
"health_monitor": {
"heartbeat_interval": 60
},
"engine": {
"manifest_file": "task-manifest.json",
"executor_pool_size": 5,
"task_timeout": 0,
"task_timeout_check_period": 15,
"task_log_severity": "INFO"
},
"database": {
"path": "./task-executor.db",
"max_task_entries": 10000
}
}