Rule Engine
The Rule Engine allows you to customize a data processing logic at the Edge without a need for software recompilation. You can define all application-specific data processing logic a set of rules, written in the Lua scripting language. The Rule Engine can execute rules asynchronously, when new data is published, according to a schedule defined in a cron-like language, or after each change of business parameters configuration.
To streamline the creation of rules, the Rule Engine provides high level function calls.
For a step-by-step guide on how to install the Rule Engine, see the Installation Guide.
Rule Engine Capabilities
Examples of the Rule Engine data processing capabilities include:
calculating the average value from a set of measurement values.
correlating several data types into a single alert.
converting data into different units (e.g. converting temperature from Celsius to Fahrenheit).
scheduling actuation of devices (e.g. turning the light on and off at a specified day and time).
aggregating data in order to create e.g. a heat index.
generating periodic reports about the state of the system.
creating chained rules, in which the output of one rule becomes the input for another rule.
creating context persistent rules, e.g. to generate performance rating.
creating rules with configurable parameters, e.g. user-defined alert thresholds.
Configuring the Rule Engine
The deployment configuration file of the Rule Engine is located at:
/etc/relayr/gwa-rule-engine-cpp/gwa-rule-engine-config.json
To modify the file, edit it in a text editor.
The file contains the following configuration settings:
mqtt_connection
:
Configuration Option | Description | Default Value |
---|---|---|
host | Hostname or IP address of the MQTT broker. | 127.0.0.1 |
port | Port number for connecting to the MQTT broker. | 1883 |
keepalive | MQTT connection keep alive interval. | 20 |
max_inflight_messages | The maximum number of QoS 1 or 2 messages that can be in the process of being transmitted simultaneously. | 100 |
qos_levels | Quality of Service level for each traffic type. | measurements: 0 , alerts: 1 . configuration: 1 , metadata: 1 |
http_connection
:
Configuration Option | Description | Default Value |
---|---|---|
host | Hostname or IP address of the Agent HTTP interface. | 127.0.0.1 |
port | Port number of the Agent HTTP interface. | 22080 |
use_ssl | Determines if the connection to the Agent interface requires SSL connection. | false |
ca_file | Path to SSL Certificate Authorities file. | /etc/ssl/certs/ca-certificates.crt |
auth_enable | Determines if the Agent HTTP interface requires authorization. | false |
username | Username to the Agent HTTP interface. | "" |
password | Password to the Agent HTTP interface. | "" |
http_timeout | Agent HTTP request timeout in seconds. If missing, the default value of 10 seconds is assumed. | 10 |
pmq_connection
- Configure this section to use the High Speed Bus (PMQ) communication channel:
Configuration Option | Description | Default Value |
---|---|---|
input_queues | List of PMQ queues to monitor for inbound messages. | ["/gwa-rule-engine"] |
message_size | Maximum size of a PMQ message, given in bytes. | 8192 |
message_count | Maximum number of messages in a PMQ queue. | 10 |
rest
:
Configuration Option | Description | Default Value |
---|---|---|
host | IP address on which REST API is served. | 127.0.0.1 |
port | Port number on which REST API is served. | 25080 |
health_monitor
:
Configuration Option | Description | Default Value |
---|---|---|
heartbeat_interval | Time interval at which the periodic heartbeat messages are sent, given in seconds. For details, see the Edge Platform Monitoring section. | 60 |
engine
:
Configuration Option | Description | Default Value |
---|---|---|
manifest_file | Path to the rule manifest file. This file contains a list of rules and criteria that trigger their execution. For more information, see the Rule Manifest File section. | rule-manifest.json |
blacklist_file | Path to the rule blacklist file. For more information, see the Rule Blacklist File section. | rule-blacklist.json |
business_params_file | Path to the file which contains values of business parameters. See here for more information. | business-params.json |
cache_dir | Path to the directory which contains rule context persistency files. For more information, see the Creating Context Persistent Variables section for more information. | . |
executor_pool_size | Number of worker threads which execute rules. | 5 |
max_exec_time | Maximum allowed time of a rule execution, given in milliseconds. This global setting can be overwritten by settings for individual rules, defined in the rule manifest file. If the maximum execution time is exceeded, the Rule Engine logs a detailed error and disables the rule. For more information, see the Rule Blacklist File section. | 100 |
rule_log_severity | Severity of a built-in rule logger. One of: TRACE , DEBUG , INFO , WARNING , ERROR , CRITICAL . You can overwrite this global setting by a rule-specific severity defined in the rule manifest file. When you change this setting, you must also change the global logging level of the Rule Engine accordingly. For more information, see the Adding Debug Mode to Rules section. | INFO |
When you edit the configuration file, restart the Rule Engine to apply your changes:
systemctl restart gwa-rule-engine-cpp
For an example of the Rule Engine configuration file, see here.
Creating Rules
Rules define the logic of data processing of the Gateway Agent. Each rule is a separate file, which contains a Lua script. A list of rules executed by the Rule Engine and criteria triggering their execution are included in the rule manifest file.
To include a rule in the Rule Engine, follow these steps:
Create a
.lua
file defining the logic of data processing.In the rule manifest file, include a path to the rule
.lua
file and define triggers of the rule execution.
See the sections below for detailed information on creating rules.
Rule Manifest File
The rule manifest file is a .json
file that contains a set of rules that are executed by the Rule Engine and defines triggers of the rule execution.
Optionally, you can also define a maximum allowed execution time and a rule log severity for every rule.
The default rule manifest file name is rule-manifest.json
, but you can configure a different path to the file in the manifest_file
option in the Rule Engine configuration file. See the Configuring the Rule Engine section for details.
The file is read once, at the application startup.
If you have the Configuration Manager installed, you can use its graphical interface to view and edit the rule manifest file. For more information, see here.
The rule manifest file has the following structure:
{
"<unique-rule-name-1>": {
"triggers": {
"on-publish": ["<topic-1>", "<topic-2>", ...],
"on-config-change": true
},
"optimize_disk_usage": true,
"file": "<path-to-lua-file-1>"
},
"<unique-rule-name-2>": {
"triggers": {
"on-tick": ["<schedule-1>", "<schedule-2>", ...]
},
"max_exec_time": 50,
"rule_log_severity": "ERROR",
"file": "<path-to-lua-file-2>"
},
"<unique-rule-name-3>": {
"triggers": {
"on-pmq": ["<topic-1>", "<topic-2>", ...],
"on-tick": ["<schedule-1>", "<schedule-2>", ...],
"on-publish": ["<topic-1>", "<topic-2>",...]
},
"max_exec_time": 200,
"rule_log_severity": "TRACE",
"cached-topics": {
"on-publish": ["<cached-mqtt-topic-1>", "<cached-mqtt-topic-2>", ...],
"on-pmq": ["<cached-pmq-topic-1>", ...]
},
"file": "<path-to-lua-file-3>"
}
}
To include a rule in the Rule Engine, add the following mandatory information about it to the rule manifest file:
Rule name - User-defined string which uniquely identifies the rule, e.g.
1-data-correlation
.triggers
- Criteria triggering the rule execution. Possible values are:on-publish
,on-tick
,on-pmq
, andon-config-change
. See the Triggers of the Rule Execution section for more information.file
- Path to the rule file, e.g.examples/2-data-conversion.lua
.
Optionally, you can also define the following settings for a rule:
max_exec_time
- Constraint for a rule execution time, in milliseconds. If the maximum execution time is exceeded, the Rule Engine logs an error and disables the rule. For more information, see the Rule Blacklist File section.rule_log_severity
- Severity of the built-in rule logger. One of:TRACE
,DEBUG
,INFO
,WARNING
,ERROR
,CRITICAL
. For more information, see the Adding Debug Mode to Rules section.
If defined for a rule,
max_exec_time
andrule_log_severity
overwrite global settings configured in theengine
section of the configuration file.
optimize_disk_usage
- This parameter controls whether persistent variables are synchronized with the persistent storage on every rule execution (if set tofalse
) or only after the rule's cleanup (if set totrue
). If not specified, the parameter value defaults tofalse
.
It is recommended to set the
optimize_disk_usage
parameter totrue
for slower disks or a large amount of data, to improve the performance of rules.
cached-topics
- A list of topics to cache. In theon-publish
array, define a set of MQTT topics to cache. In theon-pmq
array, define a set of PMQ topics to cache. See here for more information.
Triggers of the Rule Execution
There are three types of triggers that can initiate a rule execution:
Publishing new data to an MQTT topic:
A rule is executed each time data of a specified type is published. For example, when the Rule Engine receives a temperature measurement in Celsius, it converts its unit to Fahrenheit.
To use this type of trigger, add the
"on-publish"
field to the rule manifest file with data topic or topics given in brackets, e.g.["v1/nb/mydev/measurements/temperatureC"]
. The rule is triggered when data is published on a given topic.To include more topics, separate them by commas, e.g.
["v1/nb/mydev/measurements/engine", "v1/nb/mydev/measurements/temperature"]
.Both the literally defined topics and wildcards are supported.
Reading data from a PMQ queue:
A rule is executed when data matching a defined topic is read from a PMQ queue.
To use this type of trigger, add the
"on-pmq"
field to the rule manifest file with data topic or topics given in brackets:["<topic-1>", <"topic-2>"]
.Both the literally defined topics and wildcards are supported.
List of PMQ queues monitored by the Rule Engine must be configured in the
pmq_connection.input_queues
field of the configuration file.
Defined schedule:
A rule is executed synchronously, according to a schedule defined in a cron-like language. For example, a device is turned off on Fridays, at 5 p.m.
To use this type of trigger, add the
"on-tick"
field to the rule manifest file with a schedule given in brackets, e.g.["0/5 * * * * *"]
.To include more schedules, separate them by commas, e.g.
["0/5 * * ? * MON-FRI", "0/15 * * ? * SAT-SUN"]
.The scheduling is handled by the ccronexpr library.
For more information and examples of how to create schedules, see the Cron Expressions section.
Change of business parameters:
A rule is executed after every change of business parameters. The trigger is generated when a new configuration of business parameters is stored in the persistent memory.
To use this trigger, add the
"on-config-change": true
field to the rule manifest file.For a rule example that is based on this trigger type, see here.
With this trigger type, you can initialize business parameters using the rule setup() function and load the value of a given parameter only if the rule was triggered by the configuration change. See here for more information.
You can define more than one trigger type for a rule. All types of triggers can be used in any combination.
Rule execution is sequential. If a new trigger comes during a rule execution, the rule will be executed again only after the last execution ends.
Caching MQTT and PMQ Topics
The Rule Engine enables caching MQTT and PMQ topis. If enabled, the last messages received on these topics are stored and become available during the rule execution.
To configure the topic caching, add the cached-topics
setting to the rule manifest file and define:
The
on-publish
array with a set of MQTT topics to cache. Both the literally defined topics and wildcards are supported.The
on-pmq
array with a set of PMQ topics to cache. Both the literally defined topics and wildcards are supported.
Cached topics must be unique between all types of messages.
Example:
"<unique-rule-name>": {
"triggers": {
"on-publish": ["<topic-1>", "<topic-2>",...],
"on-pmq": ["<pmq-topic-1>", "<pmq-topic-2>", ...]
},
"cached-topics": {
"on-publish": ["example_cached_topic", "<topic-2>", ...],
"on-pmq": ["<pmq-topic-1>", ...]
}
You can retrieve the cached messages using the gwaLoadCachedTopic(string topicName)
, which returns the last message received on the cached topicName
topic . It returns GWA_JSON_NULL
if no value is currently cached.
Example:
function rule()
local cached = gwaLoadCachedTopic("example_cached_topic")
if (cached == GWA_JSON_NULL) then
gwaLogInfo("No cached value")
else
gwaLogInfo("Cached value: " .. cached["value"])
end
end
Supported Lua Functions
A rule body is executed by an embedded Lua interpreter v5.3. To execute a rule, the Rule Engine calls the rule()
function, which is at the beginning of each rule file.
Additionally, the Rule Engine exposes the following global utility variables and functions:
Name | Description |
---|---|
GWA_SB | Southbound topic prefix (v1/sb ). |
GWA_NB | Northbound topic prefix (v1/nb ). |
GWA_JSON_NULL | Use this to set null in JSONs generated by the rule Lua code. |
gwaTrigger | Variable contains the string which names the even which triggered rule execution. Can be one of: CRON , MQTT or PMQ . |
gwaData | Variable initialized when a rule execution is triggered by data publication. Contains the published data. |
gwaTopic | Variable initialized when a rule execution is triggered by data publication. Contains the topic on which data was published. |
gwaExtractFromTopic(integer position) | Extracts a token at the n-th position from gwaTopic . |
gwaGetTimestamp() | Returns a current number of milliseconds since the Unix epoch. |
gwaDeclarePersistent(string varName, any varInitialValue) | Declares a persistent global variable. |
gwaLoadBusinessParams() | Loads all business parameters. |
gwaLoadBusinessParam(string paramName, any paramDefaultValue) | Loads the value of an ungrouped business parameter. |
gwaLoadBusinessParam(string groupName, string paramName, any paramDefaultValue) | Loads the value of a business parameter from a specific parameter group. |
gwaLoadBusinessParam(string groupName, string sectionName string paramName, any paramDefaultValue) | Loads the value of a business parameter from a specific parameter group and a section. |
gwaIsBusinessParamSet(string paramName) | Checks if the value of an ungrouped business parameter has been modified via the REST interface. |
gwaIsBusinessParamSet(string groupName, string paramName) | Checks if the value of a business parameter that belongs to a group has been modified via the REST interface. |
gwaIsBusinessParamSet(string groupName, string sectionName string paramName) | Checks if the value of a business parameter that belongs to a group and a section has been modified via the REST interface. |
gwaLoadCachedTopic(string topicName) | Returns the last message received on the cached topicName topic . Returns GWA_JSON_NULL if no value is currently cached. See here for more information. |
gwaPublish(string topic, any data) | Publishes data of any type. |
gwaPublishMeasurement(string prefix, string deviceId, string id, integer timestamp, boolean value) | Publishes measurement data. |
gwaPublishAlert(string prefix, string deviceId, string id, integer timestamp, boolean set, string message) | Publishes alert data. |
gwaPublishLog(string prefix, string deviceId, string id, integer timestamp, string severity, string message) | Publishes log data. |
gwaPublishMetadata(string prefix, string deviceId, string id, any value) | Publishes device metadata. |
gwaPublishConfig(string prefix, string deviceId, string id, any value) | Publishes configuration data. |
gwaPublishPeer(string prefix, string deviceId, string id, any attributes) | Publishes peer data. |
gwaPublishTaskUpdate(string prefix, string deviceId, string id, string type, boolean finished, table details) | Publishes task update data. |
gwaGet(string uriPath) | Returns any data. |
gwaGetDevices(string deviceId) | Returns device data. Pass empty deviceId for data of all devices. |
gwaGetMeasurement(string deviceId, string measurementId) | Returns measurement data. Pass empty measurementId for data of all measurements. |
gwaGetAlert(string deviceId, string alertId) | Returns alert data. Pass empty alertId for data of all alerts. |
gwaGetLog(string deviceId, string logId) | Returns log data. Pass empty logtId for data of all logs. |
gwaGetMetadata(string deviceId, string metadataId) | Returns metadata. Pass empty metadatatId for all metadata. |
gwaGetConfig(string deviceId, string configId) | Returns configuration data. Pass empty configId for all configuration data. |
gwaGetPeer(string deviceId, string peerId) | Returns peer data. Pass empty peerId for data of all peers. |
gwaGetTask(string deviceId, string taskId) | Returns task data. Pass empty taskId for data of all tasks. |
gwaSubmitTask(string deviceId, string type, table details) | Submits a task for execution. Returns the ID of a new task. |
gwaJsonToString(any jsonValue) | Returns a string representation of the jsonValue . |
gwaLogTrace(string logEntry) | Logs a message with the TRACE severity. |
gwaLogDebug(string logEntry) | Logs a message with the DEBUG severity. |
gwaLogInfo(string logEntry) | Logs a message with the INFO severity. |
gwaLogWarning(string logEntry) | Logs a message with the WARNING severity. |
gwaLogError(string logEntry) | Logs a message with the ERROR severity. |
gwaLogCritical(string logEntry) | Logs a message with the CRITICAL severity. |
gwaFFT(table input, string window, string normalization, bool calculateAbs = true) | Calculates an FFT of a real-valued signal. An input signal is padded with 0s up to the nearest power of 2(N). Returns N/2 FFT samples. Available windows: Rectangular , Hamming , Hann . Available normalization strategies: None , Unitary , Standard . An FFT can also be calculated by the relayr Cloud. Example of usage: Maintenance Solution backend uses the Hann window with the Unitary normalization. By default, the returned array consists of absolute values of complex numbers being the result of FFT. If you set calculateAbs=false , the returned array contains both parts of the result (real and imaginary). The first N elements represent the real parts, followed by N numbers representing the imaginary part, e.g. Re(x)=a[i], Im(x)=a[i+S/2] , where S is the array size. |
gwaIFFT(table input, bool returnImaginaryPart = false) | Calculates the Inverse Fourier Transform of a real-valued signal. The input signal is padded with 0s up to the nearest power of 2(N). Returns a (N/2)-element array, by default. By default, the returned array contains the real part of numbers being the result of IFFT. If you set returnImaginaryPart=true , the returned array contains both parts of the result (real and imaginary). The first N rows represent the real part, followed by N numbers representing the imaginary part, e.g. Re(x)=a[i], Im(x)=a[i+S/2] , where S is the array size. |
Adding Debug Mode to Rules
The Rule Engine allows you to add logs with various severity levels to rules.
To log information about the rule:
In the rule
.lua
file, define which information to log, using thegwaLog...
functions . For a list of available functions, see the Supported Lua Functions section.Configure a
rule_log_severity
. Available options:TRACE
,DEBUG
,INFO
,WARNING
,ERROR
,CRITICAL
. You can define both a globalrule_log_severity
in theengine
section of the configuration file and a log severity for individual rules in the rule manifest file.
A
rule_log_severity
defined for an individual rule overwrites the globalrule_log_severity
setting.
You can find rule logs in the Rule Engine logs.
The
rule_log_severity
setting, which defines the logging level for the Lua logger, must be aligned with the logging level for the whole Rule Engine (see here for information on how to change it). These two settings are two different hierarchy levels of logging information, where the Lua logger is a child logger of the global Rule Engine logger.
Example: For a message with theDEBUG
severity to show in the Rule Engine logs, the following chain of actions must take place:
1. Lua logs a message at theDEBUG
level.
2. Lua logger's verbosity level is at leastDEBUG
(rule_log_severity
setting in the configuration file).
3. The message is forwarded to the root logger of the whole Rule Engine.
4. The Rule Engine service verbosity is at leastDEBUG
.
5. The message is written to the logs.
Creating Context Persistent Variables
The Rule Engine allows you to preserve values of variables not only between subsequent rule executions, but also between the Rule Engine restarts.
You can save and retrieve the required rule context data manually or use the gwaDeclarePersistent
function, which facilitates the process:
gwaDeclarePersistent(string varName, any varInitialValue)
Example:
gwaDeclarePersistent("lastState", "none")
gwaDeclarePersistent("lastMessage", "none")
A call to this function creates a global variable with its name passed in the first parameter and initializes it to the value passed in the second parameter (during the first rule execution) or the value read from the persistent storage (during all subsequent executions).
When the rule execution is finished, the Rule Engine saves the values of all variables declared as persistent in the persistent storage. You can configure the persistent storage directory in the engine.cache_dir
setting of the configuration file.
For each rule that contains context persistent variables, the Rule Engine creates a separate .json
file. For example, for a rule named rule1
, the file name is rule1.json
. The file contains variable names and their latest saved values.
Example:
{
"lastState": true,
"lastMessage": "Temperature alert"
}
See here for a rule example that includes the rule context persistency for alert states and messages.
Warning: If the Agent is deployed on Nano devices which use an SD card to store data, saving context persistent variables may have a negative impact on the data carrier's lifetime.
Processing Complex JSON Message Payloads
In a Lua rule, you can get an individual value from a complex JSON message payload, for example an analytics_event
payload.
Complex message types are converted to Lua tables.
To extract an individual value, use the gwaData
variable and indicate which data you would like to get.
Example
Message payload:
{
"top":{
"middle":{
"bottom":{
"key1":"value1",
"key2":"value2"
}
}
}
}
- To get
value1
, use:
gwaData.top.middle.bottom.key1
- To get
value2
, use:
gwaData.top.middle.bottom.key2
- To get the whole
bottom
object, use:
gwaData.top.middle.bottom
Alternatively, you can also use:
- To get
value1
:
gwaData["top"]["middle"]["bottom"]["key1"]
- To get
value2
:
gwaData["top"]["middle"]["bottom"]["key2"]
- To get the whole
bottom
object:
gwaData["top"]["middle"]["bottom"]
Creating Rules with Configurable Parameters
The Rule Engine allows you to create rules with user-defined parameter values, e.g. configurable alert thresholds. See the sections below for more information on:
Configuring Business Parameters
You can define and modify configurable business parameters using the Rule Engine's REST API interface, configured in the rest
section of the Rule Engine's configuration file. For details on actions you can perform using the REST interface, refer to the API documentation.
By default, the defined parameters are stored in the business-params.json
file. You can configure the file name in the business_params_file
setting of the Rule Engine's configuration file.
For increased clarity, you can arrange business parameters into groups and sections. For example, you can create a separate group of parameters for each of your devices. This allows you to create the same parameters with different values, e.g. different alert thresholds, for each device.
Example: There are two devices,
device-1
anddevice-2
. Both of them have a temperature-based warning alert. You can define the alert threshold independently for each device, e.g. set it to110
fordevice-1
and to90
fordevice-2
.
Parameters within groups may be further divided into sections. For examples, use sections if you want to define other types of configuration parameters for a device, apart from the Rule Engine's business parameters. Put the Rule Engine's parameters in one section, e.g. the configuration
section, and your remaining parameters in some other sections.
Example:
{
"device-1-parameters": {
"configuration": {
"engineTempWarningThreshold": 110,
"engineTempCriticalThreshold": 130,
"allowedPacketDropRate": 10
},
"other-parameters": {}
},
"device-2-parameters": {
"configuration": {
"engineTempWarningThreshold": 90,
"engineTempCriticalThreshold": 100,
"allowedPacketDropRate": 6
},
"other-parameters": {}
}
}
Using Configurable Parameters in Rules
To create a rule with a configurable parameter, use the gwaLoadBusinessParam
function, which loads the configured value of a business parameter.
If no parameter value has been defined via the REST interface, the Rule Engine uses the default value, which you must include in the
gwaLoadBusinessParam
function.
Example 1 - ungrouped parameter:
engine_temp_threshold = gwaLoadBusinessParam("engine_temp_threshold", 90)
In this example, engine_temp_threshold
is an ungrouped configurable parameter with the default value 90
.
Example 2 - parameter belonging to a group:
external_temp_threshold = gwaLoadBusinessParam("device_1", "external_temp_threshold", 80)
In this example, external_temp_threshold
is a configurable parameter, which belongs to the device_1
parameter group. The default parameter value is 80
.
Example 3 - parameter belonging to a group and a section:
critical_temp_threshold = gwaLoadBusinessParam("device_2", "configuration", "critical_temp_threshold", 110)
In this example, critical_temp_threshold
is a configurable parameter, which belongs to the device_2
parameter group and the configuration
section. The default parameter value is 110
.
To load all business parameters, use the
gwaLoadBusinessParams()
function.
Additionally, you can use the
gwaIsBusinessParamSet
function, which checks if the parameter value has already been defined via the REST interface. This allows you e.g. to use the parameter value only if it has been configured from the Cloud and ignore the default value provided in the rule.
See here for a rule example with a configurable parameter.
Triggering Rules On Configuration Change
To trigger a rule when the business parameters configuration has changed, use the on-config-change
rule trigger. In the rule manifest file, add the "on-config-change": true
field.
You can combine this trigger with any other trigger type, e.g. data publication.
Example:
{
"13-on-config-change": {
"triggers": {
"on-publish": ["v1/sb/mydev/measurements/temperature-config"],
"on-config-change": true
},
"file": "examples/13-on-config-change.lua"
}
}
See here for a rule example that uses the
on-config-change
trigger for generating alerts, and here for a rule example that uses this trigger to handle device's remote configuration changes.
When triggering rules
on-config-change
, you can use the rule setup() function to initialize the business parameter values. This allows you to check the value of a given parameter only if the rule execution was triggered by the configuration change. In this case, the.lua
rule has the following structure:
local threshold = nil
function setup()
threshold = gwaLoadBusinessParameter(...)
end()
function rule()
if gwaTrigger == "ON-CFG-CHANGE" then
threshold = gwaLoadBusinessParameter(...)
elseif gwaTrigger == "MQTT" then
...
end()
end()
See here for a rule example that uses the
setup()
function to initialize the business parameter value.
Using Rule Setup and Cleanup Facilities
The Rule Engine allows you to use the setup()
and cleanup()
functions to make changes to the rule environment. You can optionally use these functions in a rule body, apart from the main rule()
function.
Use the setup()
function to perform any actions required to initialize the rule environment to ensure it operates correctly. The setup()
function is called only once, between the Rule Engine start and the first execution of the main rule()
function.
Example:
function setup()
print("Setting up after Rule Engine startup")
end
Use the cleanup()
function to perform any actions required to revert the rule environment to the initial state. The cleanup()
function is called only once, between the last execution of main rule()
function and the Rule Engine shutdown.
Example:
function cleanup()
print("Cleaning up before Rule Engine shutdown")
end
Unlike the
rule()
function, which is executed each time the rule is triggered, thesetup()
andcleanup()
functions are always executed after the Rule Engine's start or before its shutdown, respectively.
In the setup()
and cleanup()
functions, you can use all the Rule Engine utility functions available in the main rule()
function. Note that no trigger information exists during the execution of the setup()
and cleanup()
functions, so the variables gwaTrigger
, gwaTopic
and gwaData
are undefined.
Importing External Lua Libraries and Custom Helper Functions
The Rule Engine allows you to import external Lua libraries and custom helper functions.
You can add information about external modules to the Lua interpreter in three different ways, described in the sections below.
For more information on importing external Lua modules, see the Lua documentation:
- sections aboutpackage.path
andpackage.cpath
- section about therequire
function
Method 1 - External Code in the Standard Location
To use external Lua libraries or custom helper functions, follow these steps:
- Place your external code in the standard location of a given platform.
To find out what the standard location on a given platform is, use the interpreter to print the default content of
package.path/package.cpath
, e.g. using:
lua5.3 -e "print(package.path)"
- In your rule code, use the
require
function to specify what external modules you want to use, e.g.:
require(“test“)
Method 2 - Modifying Environmental Variables
To use external Lua libraries or custom helper functions, follow these steps:
Place your external code in a custom location.
Modify the environmental variables
LUA_PATH
andLUA_CPATH
.
In this method, you must modify the environment of the Rule Engine's process in which rules are executed to ensure the variables point to the proper location.
- In your rule code, use the
require
function to specify what external modules you want to use, e.g.:
require(“test“)
Method 3 - Setting Paths in a Rule File
In this method, you must set the proper paths manually in every rule file.
To use external Lua libraries or custom helper functions, follow these steps:
Place your external code in a custom location.
In a rule file, set the
package.path/package.cpath
, e.g.:
package.path = “/[folder]/?.lua“
You must modify the
package.path
before you use therequire
function. Otherwise, your external module will not be imported.
- In a rule file, use the
require
function to specify what external modules you want to use, e.g.:
require(“test“)
Rule Examples
This section presents two examples of rules: unit conversion and scheduled actuation. For each of them, the rule body and the rule manifest file are given. The rule body contains the data processing logic and the rule manifest file contains triggers for the rule execution.
You can also find an example how to create chained rules, in which the output of a rule becomes the input for another rule and and an example of the rule context persistency, where the rule context can be preserved between the Rule Engine restarts.
You can also find a rule with configurable business parameters, a rule that triggers alerts, based on configuration changes, and a rule for handling device configuration changes.
For more rule examples, see the
examples
folder located at:
/usr/share/relayr/gwa-rule-engine-cpp/examples
Unit Conversion
In this example, the Rule Engine converts temperature from Celsius to Fahrenheit.
Rule body (file name: 2-data-conversion.lua
):
function rule()
local did = gwaExtractFromTopic(2)
local temperatureC = gwaData.value
local temperatureF = temperatureC * 9 / 5 + 32
gwaPublishMeasurement(GWA_SB, did, "temperatureF", gwaData.ts, temperatureF)
end
Rule manifest file:
{
"2-data-conversion": {
"triggers": {
"on-publish": ["v1/nb/mydev/measurements/temperatureC"]
},
"file": "examples/2-data-conversion.lua"
}
}
Description:
The
rule()
function is called to execute the rule.The rule is triggered by data publication. The input is the temperature measurement in Celsius published on topic:
"v1/nb/mydev/measurements/temperatureC"
.The topic and message of the published data are available in variables
gwaTopic
andgwaData
.The read temperature is converted to Fahrenheit.
The Fahrenheit temperature is published as a measurement.
Scheduled Actuation
In this example, lights in an office are turned on at 6 a.m. and turned off at 10 p.m. on week days. On weekends, lights are turned on 8 a.m. and turned off at 6 p.m.
Rule body (file name: 5-scheduled-actuation.lua
):
function rule()
local hour = os.date("*t").hour
local day = os.date("*t").wday
-- if 6AM week day or 8AM weekend day
if (hour == 6 and (day > 0 and day < 6)) or (hour == 8 and (day == 0 or day == 6)) then
-- turn the lights on
details = '{parameters: [{id: "Light_SW1", value: "on"}, {id: "Light_SW2", value: "on"}]}'
gwaSubmitTask("mydev", "cfg-task", details);
end
-- if 10PM week day or 6PM weekend day
if (hour == 22 and (day > 0 and day < 6)) or (hour == 18 and (day == 0 or day == 6)) then
-- turn the lights off
details = '{parameters: [{id: "Light_SW1", value: "off"}, {id: "Light_SW2", value: "off"}]}'
gwaSubmitTask("mydev", "cfg-task", details);
end
end
Rule manifest file:
{
"5-scheduled-actuation": {
"triggers": {
"on-tick": ["0 0 6,22 ? * MON-FRI", "0 0 8,18 ? * SAT,SUN"]
},
"file": "examples/5-scheduled-actuation.lua"
}
}
Description:
The
rule()
function is called to execute the rule.The rule is triggered according to a schedule - at 6 a.m. and 10 p.m from Monday to Friday and at 8 a.m. and 6 p.m. on Saturday and Sunday.
Light switch names are:
"Light_SW1"
and"Light_SW2"
.The rule output is submitting a task to change the state (
on/off
) of the light switches, according to the defined schedule.
Chained Rules
The Rule Engine enables creating chained rules, in which the output of a rule becomes the input for another rule.
The Rule Engine itself does not have a global memory that is shared by all rules. Sharing the context between rules can be achieved by chaining them, as shown below.
Example:
This example uses the conversion of temperature units from Celsius to Fahrenheit, as described in the Unit Conversion section.
The chaining of rules is presented in the following two rules in the examples
folder:
7-chained-rules-r1.lua
7-chained-rules-r2.lua
The first file contains a rule that publishes the converted temperature measurement on the "custom/mydev/temperatureF"
topic:
function rule()
gwaPublish("custom/mydev/temperatureF", gwaData.value * 9 / 5 + 32)
end
As shown in the rule manifest file, measurements published on this topic become a trigger for the other rule:
{
"7-chained-rules-r2": {
"triggers": {
"on-publish": ["custom/mydev/temperatureF", "v1/nb/mydev/measurements/humidity"]
},
"file": "examples/7-chained-rules-r2.lua"
}
}
As defined in the 7-chained-rules-r2.lua
file, temperature measurements published on the "custom/mydev/temperatureF"
topic are the input for calculating the heat index:
local T = nil
local R = nil
local c1 = -42.379
local c2 = -2.04901523
local c3 = -10.14333127
local c4 = -0.22475541
local c5 = -6.83783 * math.pow(10, -3)
local c6 = -5.481717 * math.pow(10, -2)
local c7 = -1.22874 * math.pow(10, -3)
local c8 = 8.5282 * math.pow(10, -4)
local c9 = -1.99 * math.pow(10, -6)
function rule()
if gwaTopic == "custom/mydev/temperatureF" then
T = gwaData
end
if gwaTopic == "v1/nb/mydev/measurements/humidity" then
R = gwaData.value
end
if T ~= nil and R ~= nil then
HI = c1 + c2 * T + c3 * R + c4 * T * R + c5 * T * T + c7 * T * T * R + c8 * T * R * R + c9 * T * T * R * R
gwaPublishMeasurement(GWA_SB, "mydev", "heat_index", gwaGetTimestamp(), HI)
end
end
Context Persistency
In this example, the Rule Engine generates alerts with various messages, based on the temperature measurement values.
The latest values of alert state and message are saved in the persistent storage. They are preserved even if the Rule Engine restarts in the meantime, which enables publishing a new alert only if the state or message have changed.
See here for more information on the rule context persistency.
Rule body (file name: 11-alert-state-persistence.lua
):
gwaDeclarePersistent("lastState", "none")
gwaDeclarePersistent("lastMessage", "none")
function rule()
local did = gwaExtractFromTopic(2)
local temperature = gwaData.value
local state = nil
local message = nil
-- evaluate current alert state & message based on temperature value
if temperature > 130 then
state = true
message = "I'm melting"
elseif temperature > 120 then
state = true
message = "It's too hot"
elseif temperature > 110 then
state = true
message = "Temperature rising"
else
state = false
message = "Temperature acceptable"
end
-- send alert if state or message has changed
if lastState ~= state or lastMessage ~= message then
gwaPublishAlert(GWA_SB, did, "temperature_high", gwaGetTimestamp(), state, message)
end
-- set last values to current values
lastState = state
lastMessage = message
end
Rule manifest file:
{
"11-alert-state-persistence": {
"triggers": {
"on-publish": ["v1/nb/mydev/measurements/temperature"]
},
"file": "examples/11-alert-state-persistence.lua"
}
}
Description:
The rule is triggered by data publication. The input is the temperature measurement.
The
gwaDeclarePersistent
functions declare the latest alert state and message as context persistent variables. When the rule is executed for the first time, the variables do not have any specific initial values (as defined in thenone
parameter). For all subsequent rule executions, the values are read from the persistent storage.Based on the received temperature value, the Rule Engine evaluates the current alert state and message, e.g. if the temperature is above 120, the alert with message
It's too hot
is set.The Rule Engine compares the current alert state and message with the latest values saved in the persistent storage and publishes a new alert only if the values have changed.
The current alert state and message are now saved as context persistent values, e.g.:
{
"lastMessage": "It's too hot",
"lastState": true
}
Configurable Business Parameters
In this example, the Rule Engine generates alerts, based on a configurable threshold.
See here for more information on using configurable parameters in rules.
Rule body (file name: 9-business-parameters.lua
):
function rule()
did = gwaExtractFromTopic(2)
engine_temp = gwaData.value
if gwaIsBusinessParamSet(did) then
alert_name = gwaLoadBusinessParam(did, "engine_alert_name", "engine_temp_alert")
engine_temp_threshold = gwaLoadBusinessParam(did, "engine_alert", "engine_temp_threshold", 90)
if engine_temp > engine_temp_threshold then
gwaPublishAlert(GWA_SB, did, alert_name, gwaGetTimestamp(), true, "Engine temperature is high")
else
gwaPublishAlert(GWA_SB, did, alert_name, gwaGetTimestamp(), false, "Engine temperature ok")
end
else
gwaLogError("Business parameter group " .. did .. " was not set")
end
end
Rule manifest file:
{
"9-business-parameters.lua": {
"triggers": {
"on-publish": ["v1/sb/mydev/measurements/engine_temp"]
},
"file": "examples/9-business-parameters.lua"
}
}
Description:
The
rule()
function is called to execute the rule.The rule is triggered by data publication to topic
v1/sb/mydev/measurements/engine_temp
.The
engine_temp_threshold
is a user-configurable parameter. You can define its value using the REST interface (see the API documentation for details). If no value has been defined, the parameter has the default value of90
.The Rule Engine triggers an alert, depending on the parameter value. If the value is greater than the defined threshold, the
engine_temp_alert
alert is set. If the temperature is smaller than the threshold, the alert is cleared.If no business parameters are set for the device (checked by the
gwaIsBusinessParamSet(did)
function), the Rule Engine logs an error message.
On Configuration Change Trigger
In this example, the Rule Engine generates alerts, based on a configurable threshold. The rule is triggered when new data is published or when the business configuration parameters are changed.
Rule body (file name: 13-on-config-change.lua
):
gwaDeclarePersistent("lastState", "none")
gwaDeclarePersistent("lastTemperature", nil)
local function evaluateAlert(temperature, threshold, did)
local state = nil
local message = nil
if temperature > threshold then
state = true
message = "It's too hot"
else
state = false
message = "Temperature acceptable"
end
-- send alert if state has changed
if lastState ~= state then
gwaPublishAlert(GWA_SB, did, "temperature_high", gwaGetTimestamp(), state, message)
end
-- set last values to current values
lastState = state
end
function rule()
local temperature = nil
if gwaTrigger == "MQTT" then
temperature = gwaData.value
lastTemperature = temperature
elseif gwaTrigger == "CFG-CHANGE" then
temperature = lastTemperature
else
gwaLogError(gwaTrigger)
gwaLogError("Unknown trigger")
return
end
local did = "mydev"
local threshold = gwaLoadBusinessParam(did, "alert_threshold", 120)
if temperature ~= nil then
evaluateAlert(temperature, threshold, did)
end
end
Rule manifest file:
{
"13-on-config-change": {
"triggers": {
"on-publish": ["v1/sb/mydev/measurements/temperature-config"],
"on-config-change": true
},
"file": "examples/13-on-config-change.lua"
}
}
Description:
There are two rule triggers:
Data publication - in this case, the Rule Engine checks if the new temperature value crosses the alert threshold, which is loaded by the
gwaLoadBusinessParam
function.Change of business parameters configuration - in this case, the Rule Engine checks if the alert threshold value has changed by means of the
gwaLoadBusinessParam
function. Then, it checks if the last temperature value, saved as a context persistent variable, crosses the new threshold.
If the alert state is different from the
lastState
value, a new alert is published.
Configuration Tasks
In this example, the Rule Engine generates a configuration task, which changes a device's register value.
Use this script to handle remote configuration changes of the BACnet Protocol Adapter.
The generated task is then handled by the Task Executor, which forwards it to the southbound interface, to Protocol Adapters. See here for a task script example.
The rule is triggered when business configuration parameters are changed.
Rule body (file name: 14-config-tasks.lua
):
-- target device ID (set in Protocol Adapter configuration):
did = "my-device-id"
-- originator options may be useful if task-update support is needed:
originator_name = "gwa-rule-engine"
originator_instance = ""
originator_id = ""
function setup()
gwaDeclarePersistent("registerLastValue", {})
end
function rule()
if gwaTrigger == "CFG-CHANGE" then
local registers = gwaLoadBusinessParam(did, "registers", nil)
if registers ~= nil then
for registerName, registerValue in pairs(registers) do
-- generate config-task only if value is defined in business-params and changed
if registerValue ~= registerLastValue[registerName] then
local details = {}
-- create structure of config-task
details["name"] = registerName
details["parameters"] = {}
details["parameters"][1] = {}
details["parameters"][1]["id"] = registerName
details["parameters"][1]["value"] = registerValue
details["originator_info"] = {}
details["originator_info"]["originator"] = originator_name
details["originator_info"]["instance"] = originator_instance
details["originator_info"]["id"] = originator_id
details["originator_info"]["ts"] = gwaGetTimestamp()
gwaSubmitTask(did, "config-task", details)
end
end
registerLastValue = registers
else
gwaLogWarning(did.." -> registers object is not defined!")
end
else
gwaLogError(gwaTrigger)
gwaLogError("Trigger unsupported by rule script.")
return
end
end
Rule manifest file:
{
"14-config-tasks": {
"triggers": {
"on-config-change": true
},
"file": "examples/14-config-tasks.lua"
}
}
Description:
In this example, the BACnet Protocol Adapter register configuration is changed by means of a generated config-task.
The device UUID is
my-device-id
. Remember to configure the same UUID in the BACnet Protocol Adapter's configuration.The register configuration is defined as configurable business parameters:
"my-device-id": {
"registers": {
"bin0_polarity": 0,
"bin1_polarity": 1
}
}
You can set theses business parameters from the relayr Cloud using this request:
curl -X POST "https://cloud.stg.az.relayr.io/devices/my-device-id/configurations" -H "accept: application/json" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" -d '{"registers":{"bin0_polarity":0, "bin1_polarity": 1}}'
Remember to configure the same parameters in the BACnet Protocol Adapter's configuration.
The rule is triggered when the configuration of business parameters is changed, as defined by the
"on-config-change": true
trigger in the rule manifest file.The Rule Engine compares the newest register value with the latest value saved in the persistent storage:
gwaDeclarePersistent("registerLastValue", {})
.If the register value has changed in the configuration of business parameters, the Rule Engine generates a config-task, which is then handled by the Task Executor to forward it to the southbound interface.
Remember to include the same task name in the task manifest file. In this rule example, the task name is config-task. See here for an example of task forwarding script.
Rule Blacklist File
The rule blacklist file contains a list of disabled rules. If a rule is listed in this file, the Rule Engine does not execute it when it is triggered.
Blacklisting a rule is a safety guard for the rule execution time. It allows you to disable rules that took too long to execute.
A rule may be blacklisted in the following ways:
- automatically - When the rule took too long to execute, the Rule Engine automatically adds it to the rule blacklist file and logs a detailed error. You can define both a global maximum rule execution time (
max_exec_time
) in theengine
section of the configuration file and a maximum execution time for individual rules in the rule manifest file.
A
max_exec_time
defined for an individual rule overwrites the globalmax_exec_time
setting.
- manually - By editing the blacklist file by hand.
The blacklist file is read once, at the application startup and saved each time a rule is blacklisted or at the application shutdown. If you edit the file manually, restart the Rule Engine to apply your changes:
systemctl restart gwa-rule-engine-cpp
The default rule blacklist file name is rule-blacklist.json
, but you can configure a different path to the file in the blacklist_file
option in the Rule Engine configuration file.
The file contains a JSON encoded list of rule names, e.g.:
[
"<unique-rule-name-1>",
"<unique-rule-name-2>",
"<unique-rule-name-3>",
...
]
Cron Expressions
A cron expression is one of the trigger types that can
execute a rule in the Rule Engine. It is specified in the on-tick
field in the rule manifest file.
A cron expression is a string that consists of 6 fields separated by white spaces. All fields are mandatory.
For each field, there is a set of allowed values and special characters. The table below presents values allowed for each field. For a description special characters, see the Allowed Special Characters section.
For examples of expressions, see the Examples section.
Here is an overview of the fields and their allowed values:
Field Name | Allowed Values | Allowed Special Characters |
---|---|---|
Seconds | 0-59 | , - * / |
Minutes | 0-59 | , - * / |
Hours | 0-23 | , - * / |
Days of the month | 1-31 | , - * ? / |
Month | 1-12 or JAN-DEC | , - * / |
Days of the week | 0-7 or MON-SUN (0 and 7 mean Sunday) | , - * ? / |
The names of months and days of the week are not case sensitive, e.g. MON and mon mean the same.
Allowed Special Characters
The allowed special characters have the following meaning:
Character | Meaning | Examples |
---|---|---|
* | All possible values | * in the minute field means every minute. |
? | No specific value | To execute a rule on a particular day of the month (e.g.: the 12th), but it does not matter what day of the week it is, enter 12 in the day of month field, and ? in the day of week field. If you specify days of a week, put ? in the day of month field. |
- | Range of values | 9-11 in the hour field means the hours 9, 10 and 11. |
, | Additional values | MON,WED,FRI in the day of week field means the days Monday, Wednesday, and Friday. |
/ | Increments of values | 0/15 in the seconds field means the seconds 0, 15, 30, and 45. 5/15 in the seconds field means the seconds 5, 20, 35, and 50. |
Examples
Here are some examples of schedules that trigger rules:
Expression | Meaning |
---|---|
0/5 * * * * * | Every 5 seconds |
0 0/5 * * * * | Every 5 minutes |
0 0 8 * * ? | Every day at 8 a.m. |
0 0 6,22 ? * MON-FRI | At 6 a.m. and 10 p.m. from Monday to Friday |
0 10 7 4 * ? | At 7:10 a.m. on the fourth day of every month |
0 0 22 1/3 * ? | At 10 p.m. every three days every month, starting on the first day of the month |
0 20 6 ? JUN-SEP 1 | At 6:20 a.m. on Monday, from June to September |
Configuration File Example
{
"mqtt_connection": {
"host": "127.0.0.1",
"port": 1883,
"keepalive": 20,
"max_inflight_messages": 100,
"qos_levels": {
"measurements": 0,
"alerts": 1,
"configuration": 1,
"metadata": 1
}
},
"http_connection": {
"host": "127.0.0.1",
"port": 22080,
"use_ssl": false,
"ca_file": "/etc/ssl/certs/ca-certificates.crt",
"auth_enable": false,
"username": "",
"password": "",
"http_timeout": 10
},
"pmq_connection": {
"input_queues": ["/gwa-rule-engine"],
"message_size": 8192,
"message_count": 10
},
"rest": {
"host": "127.0.0.1",
"port": 25080
},
"health_monitor": {
"heartbeat_interval": 60
},
"engine": {
"manifest_file": "rule-manifest.json",
"blacklist_file": "rule-blacklist.json",
"business_params_file": "business-params.json",
"cache_dir": ".",
"executor_pool_size": 5,
"max_exec_time": 100,
"rule_log_severity": "INFO"
}
}