GWA Analytics
The GWA Analytics enables a wide range of custom data processing tasks, such as anomaly detection or data volume reduction, implemented by means of Python scripts. Scripts can be executed asynchronously, when new data is published, or according to a schedule, defined in a cron-like language.
The GWA Analytics is only available for Debian Buster Linux distributions. For a step-by-step installation guide, see here.
Configuring GWA Analytics
There are two configuration files of the GWA Analytics:
Deployment configuration file - Configuration of the MQTT connection, PMQ connection and health monitoring.
Application configuration file - Configuration of communication with Python subprocesses, script definitions, and script triggers.
When you modify the GWA Analytics configuration, restart the service to apply the changes by means of the following command:
systemctl restart gwa-analytics
For an example of the GWA Analytics configuration files, see here.
Deployment Configuration File
The GWA Analytics deployment configuration file is located at:
/etc/relayr/gwa-analytics/gwa-analytics-config.json
The file contains the following configuration settings:
mqtt_connection
:
Configuration Option | Description | Default Value |
---|---|---|
host | Hostname or IP address of the MQTT broker. | 127.0.0.1 |
port | Port number for connecting to the MQTT broker. | 1883 |
keepalive | MQTT connection keep alive interval. | 20 |
max_inflight_messages | The maximum number of QoS 1 or 2 messages that can be in the process of being transmitted simultaneously. | 100 |
qos_levels | Quality of Service level for each traffic type. | measurements: 0 , alerts: 1 . configuration: 1 , metadata: 1 |
rest
:
Configuration Option | Description | Default Value |
---|---|---|
host | IP address on which REST API is served. | 127.0.0.1 |
port | Port number on which REST API is served. | 27080 |
health_monitor
:
Configuration Option | Description | Default Value |
---|---|---|
heartbeat_interval | Time interval at which the periodic heartbeat messages are sent, given in seconds. For details, see the Edge Platform Monitoring section. | 60 |
pmq_connection
- Configure this section to use the High Speed Bus (PMQ) communication channel:
Configuration Option | Description | Default Value |
---|---|---|
input_queues | List of PMQ queues to monitor for inbound messages. | ["/gwa-analytics"] |
output_queues | List of PMQ queues for outbound messages. | [] |
message_size | Maximum size of a PMQ message, given in bytes. | 8192 |
message_count | Maximum number of messages in a PMQ queue. | 10 |
Application Configuration File
The GWA Analytics application configuration file is located at:
/etc/relayr/gwa-analytics/gwa-analytics-application-config.json
The file contains the following configuration settings:
Configuration Option | Description | Example of Value |
---|---|---|
grpc_server_address | Address of the socket used to communicate with Python worker subprocesses. It is recommended to use unix sockets. | unix:///tmp/gwa-analytics.sock |
module_path | Global path to the directory containing Python modules. For a specific script, you can also configure a module path that overrides the global setting. See here for more information. | /usr/share/relayr/gwa-analytics/modules |
subprocess_executable_path | Path to the Python worker subprocess executable. | /usr/bin/gwa-analytics-python-subprocess |
workspace_path | Path to the working directory of the scripts. Each script creates its own workspace within that directory. | /var/cache/relayr/gwa-analytics/workspace |
venv_path | Optional. Path to the virtual environment used by the interpreter. See here for more information. | /var/lib/relayr/gwa-analytics/gwa-analytics-venv/ |
scripts | Configuration of scripts and their triggers. See here for more information. | |
cached-topics | List of MQTT and PMQ topics to cache. See here for more information. |
Configuring Scripts
The scripts
section of the application configuration file lets you define a set of scripts and their triggers.
For each script, provide its unique name of your choice and the following settings:
module_name
- Name of the Python module to execute, e.g.module_1
.triggers
- Criteria that trigger the script execution. Possible triggers are:
Configuration Option | Description | Example of Value |
---|---|---|
on-mqtt-publish | Set of MQTT topics which are monitored for data publication. | ["mqtt_topic1", "mqtt_topic2"] |
on-pmq-publish | Set of PMQ topics which are monitored for data publication. | ["pmq_topic2", "pmq_topic2"] |
on-tick | Schedules for executing the rule in a cron-like manner. The scheduling is handled by the ccronexpr library. See here for more information and examples. | ["*/5 * * * * *", "0 0/5 * * * *"] |
on-business-parameters-change | If set to true , the script is executed when the configuration of business parameters changes. See here for more information. | true |
You can define more than one trigger type for a script. You can use all types of triggers in any combination. E.g. if you configure the
on-mqtt-publish
andon-tick
triggers for a script, it is executed both according to a defined schedule and when new data arrives to the MQTT topics you specify.
Optionally, you can also configure the following settings for each script:
Configuration Option | Description | Example of Value |
---|---|---|
module_path | Directory containing the module. It overrides the global module_path setting. | modules1 |
log_level | Script log verbosity level. Possible values: trace , info , debug , error , warning . critical , off . By default, info . To disable logging, set it to off . For more information on the script logging facility, see here. | info |
Example:
"scripts":
{
"script4":
{
"module_name": "module_1",
"triggers": {
"on-mqtt-publish": ["mqtt_topic1"],
"on-pmq-publish": ["pmq_topic1"],
"on-tick": ["*/5 * * * * *"]
},
"log_level": "info",
"module_path": "modules1"
},
"unique-script-name-2":
{
"module_name": "module_2",
"triggers": {
"on-mqtt-publish": ["mqtt_topic2"],
"on-business-parameters-change": true
}
}
}
Caching MQTT and PMQ Topics
In the cached-topics
section of the application configuration file, you can list MQTT and PMQ topics which you want to cache.
The last message received on these topics is stored and you can retrieve it using the load_cached_topic
function in scripts.
The cached topics must be unique between all types of messages.
Example:
{
"cached-topics":
{
"mqtt": ["topic-1", "topic-2"],
"pmq": ["topic-3", "topic-4"]
}
}
Supported Functions
The sections below present functions that you can use in your Python scripts. The main script function is mandatory. You can also use helper functions of the GWA module.
Main Script Function
The GWA Analytics executes scripts by means of an embedded Python3 interpreter.
The interpreter expects the module to export at least the function(trigger: execution_trigger, topic: str, data: str)
function, which is executed every time the module is triggered.
On execution, the function is called with the following information:
trigger
set to enum representing the trigger source. Possible values:MQTT
,PMQ
,CRON
,BUSINESS_CFG_CHANGE
.topic
anddata
strings containing the trigger data, if available.
GWA Module Functions
Each script has access to the gwa
module (import gwa
) containing the following functions and variables that provide integration with the Gateway Agent:
Function | Description |
---|---|
declare_persistent(varName: str, varInitialValue) | Declares a persistent global variable. See here for more information. |
declare_shared_rw(name: str, initialValue) | Declares a writeable shared variable. See here for more information. |
declare_shared_ro(name: str, initialValue) | Declares a read-only shared variable. See here for more information. |
load_cached_topic(topicName: str) | Returns the last message received on the cached topicName topic. When no message is cached, returns None . See here for more information. |
JsonPointer(*args) | Helper class for building RFC6901 compliant pointers. See here for details. |
get_business_parameter(pointer: str, default=any) | Returns a business parameter defined by the pointer (RFC6901 compliant pointers. See here syntax). default is an optional named argument that is returned if the parameter's value is not present. If not provided, an error is thrown. See here for details. |
get_business_parameter(pointer: gwa.JsonPointer, default=any) | Returns a business parameter defined by the pointer. default is an optional named argument that is returned if the parameter's value is not present. If not provided, an error is thrown. See here for details. |
check_business_parameter(pointer: str) | Returns true if the parameter defined by the pointer (RFC6901 exists. Otherwise, returns false . See here for details. |
check_business_parameter(pointer: gwa.JsonPointer) | Returns true if the parameter defined by the pointer exists. Otherwise, returns false . See here for details. |
publish(topic: str, data) | Publishes data in the JSON format to a given MQTT topic. |
publish_raw(topic: str, data: bytes) | Publishes raw binary data to a given MQTT topic. |
publish_measurement(prefix: str, deviceId: str, id: str, timestamp: int, value) | Publishes measurement data. |
publish_alert(prefix: str, deviceId: str, id: str, timestamp: int, set: bool, message: str) | Publishes alert data. |
publish_log(prefix: str, deviceId: str, id: str, timestamp: int, severity: str, message: str) | Publishes log data. |
publish_metadata(prefix: str, deviceId: str, id: str, timestamp: int, value) | Publishes device metadata. |
publish_config(prefix: str, deviceId: str, id: str, timestamp: int, value) | Publishes configuration data. |
publish_peer(prefix: str, deviceId: str, id: str, timestamp: int, value) | Publishes peer data. |
publish_task_update(prefix: str, deviceId: str, id: str, type: str, finished: bool, details: dict) | Publishes task update data. |
publish_complex_measurement(prefix: str, deviceId: str, id: str, tsStart: int, tsEnd: int, xStart: float, xStep: float, values: list) | Publishes complex measurement data. |
publish_location(prefix: str, deviceId: str, timestamp: int, value) | Publishes location data. |
log_trace(string logEntry, *args, **kwargs) | Logs a message with the TRACE severity, supports str.format() style formatting. See here for more information. |
log_debug(string logEntry, *args, **kwargs) | Logs a message with the DEBUG severity, supports str.format() style formatting. See here for more information. |
log_info(string logEntry, *args, **kwargs) | Logs a message with the INFO severity, supports str.format() style formatting. See here for more information. |
log_warning(string logEntry, *args, **kwargs) | Logs a message with the WARNING severity, supports str.format() style formatting. See here for more information. |
log_error(string logEntry, *args, **kwargs) | Logs a message with the ERROR severity, supports str.format() style formatting. See here for more information. |
log_critical(string logEntry, *args, **kwargs) | Logs a message with the CRITICAL severity, supports str.format() style formatting. See here for more information. |
get_timestamp() | Returns the current UNIX epoch timestamp in milliseconds. |
execution_trigger | Enum representing the trigger source. Possible values: MQTT , PMQ , CRON , BUSINESS_CFG_CHANGE . |
nb | String representing the northbound prefix v1/nb . |
sb | String representing th southbound prefix v1/sb . |
Using Setup and Cleanup Functions
The setup()
and cleanup()
functions let you make changes to a script environment. You can optionally use them in the script file, apart from the main function
.
Unlike the main script function, which is executed each time the rule is triggered, the
setup()
andcleanup()
functions are always executed after the application's start or before its shutdown, respectively.
Setup Function
Use the setup()
function to perform any actions required to initialize the script environment to ensure it operates correctly. The setup()
function is called only once, between the application's start and the first execution of the main script function.
Example:
def setup():
print("Setting up after the start")
Cleanup Function
Use the cleanup()
function to perform any actions required to revert the rule environment to the initial state. The cleanup()
function is called only once, between the last execution of main rule()
function and the application's shutdown.
Example:
def cleanup():
print("Cleaning up before the shutdown")
Creating Scripts with Configurable Parameters
The GWA Analytics lets you supply scripts with a set of user-defined parameters, which configure settings specific to a business domain. e.g. alert thresholds.
Configuring Business Parameters
You can define and modify business parameters via the GWA Analytics REST API interface, configured in the rest
section of the deployment configuration file. For details, refer to the API documentation.
Example:
{
"parameters": {
"parameter-1": 143,
"parameter-2": 14
},
"other-parameters": {}
}
Using Business Parameters in Scripts
The parameters are stored in a JSON format and you can access them using the JSON Pointer notation defined by RFC6901.
To access the configured parameters in scripts, use the gwa.get_business_parameter
function. If the parameter value is not defined via the REST interface, the default value supplied in the call to gwa.get_business_parameter
is used.
You can check if a parameter's value was defined by using the gwa.check_business_parameter
helper function.
Example:
import gwa
def function(trigger, topic, data):
print("Scheduled execution at: {}".format(gwa.get_timestamp()))
if gwa.check_business_parameter('/example/param1'):
print('Parameter set')
else:
print('Parameter not set')
gwa_pointer = gwa.JsonPointer('example', 'param2')
if gwa.check_business_parameter(gwa_pointer):
print('Parameter set')
else:
print('Parameter not set')
param = gwa.get_business_parameter('/example/param3', default='default value')
print(param)
gwa_pointer2 = gwa.JsonPointer('example', 'param4')
param = gwa.get_business_parameter(gwa_pointer2, default='default value')
print(param)
Triggering Scripts on Business Configuration Change
You can trigger a script execution when the configuration of business parameters changes. To do so, use the on-business-parameters-change
trigger type in the script configuration.
Example:
"scripts": {
"business_parameters_trigger": {
"module_name": "business_parameters_trigger",
"triggers": {
"on-business-parameters-change": true
}
In this case, the script trigger is: source == gwa.execution_trigger.BUSINESS_CFG_CHANGE
.
Example:
import gwa
def function(trigger, topic, data):
if trigger == gwa.execution_trigger.CRON:
print('Scheduled execution at: {}'.format(gwa.get_timestamp()))
print(gwa.get_business_parameter())
elif trigger == gwa.execution_trigger.BUSINESS_CFG_CHANGE:
print('Configuration changed at at: {}'.format(gwa.get_timestamp()))
print(gwa.get_business_parameter())
Creating Context Persistent Variables
The GWA Analytics allows you to preserve values of variables not only between subsequent script executions, but also between the GWA Analytics restarts.
To create a context persistent variable, use the declare_persistent
function.
Example:
gwa.declare_persistent("variable1", {"id": "test"})
A call to this function creates a global variable within the
gwa
module with the variable name name passed in the first parameter and initializes it to the value passed in the second parameter (during the first rule execution) or the value read from the persistent storage (during all subsequent executions).
When the script execution is finished, the GWA Analytics saves the values of all variables declared as persistent in the persistent storage. The state of variables is saved inside the script's workspace, which you can configure in the workspace_path
of the application configuration file.
The GWA Analytics uses Python's pickle module in the backend, which enables storing any value type that is pickable.
Creating Shared Variables
Shared variables allow scripts to share data on a producer-consumer basis.
You can declare a shared variable by means of the following functions:
declare_shared_rw(name: str, initialValue)
, which declares a writeable shared variable.declare_shared_ro(name: str, initialValue)
, which declares a read-only shared variable.
The GWA Analytics uses Python's pickle module in the backend, which enables storing any value type that is pickable.
Producer Setup
A producer initializes the shared variable using the declare_shared_rw
function. Each variable can have only one producer.
This creates a variable within the gwa
module. The value of this variable is transferred to the shared cache
at the end of every triggered execution.
Module Example:
import gwa
gwa.declare_shared_rw("buffer", [0 for _ in range(10)])
ix = 0
def function(source, topic, data):
global ix
gwa.buffer = gwa.buffer[1:] + [ix]
ix = ix + 1
Consumers Setup
Consumers initialize the shared variables using the declare_shared_ro
function.
This creates a variable within the gwa
module. The value of this variables is fetched from the shared cache before every triggered execution.
Module Example:
import gwa
gwa.declare_shared_ro("buffer")
def function(source, topic, data):
print(gwa.buffer)
Using Script Logging Facility
When creating a script, you can use a built-in logger for debugging or preserving important information.
To log the script information, follow these steps:
- In the Python file, define which information to log, using the
log
functions. See the GWA Module Functions sections for a list of functions.
Example:
gwa.log_error("No cached values")
- Configure the
log_level
in the script configuration in the application configuration file. If you don't configure it, the logger uses the defaultinfo
value. Based on this setting, the built-in logger only logs information on this lever or higher.
The
log_level
in the script configuration must be aligned with the global log verbosity level for the GWA Analytics.
For example, to log information on thedebug
level, both thelog_level
in the script configuration and the global GWA Analytics log level must be at leastdebug
.
You can find the logged entries in the GWA Analytics logs. Each entry contains the time, file name and line number of the call to the logger.
Python Logging Compatibility
The GWA Analytics provides a limited compatibility with Python's logging facilities by the means of gwa.logging
submodule.
Function | Description |
---|---|
debug(msg, *args, **kwargs) | Delegates to the log_debug function. *args are used for %-style formatting. kwargs are discarded. |
info(msg, *args, **kwargs) | Delegates to the log_info function. *args are used for %-style formatting. kwargs are discarded. |
warning(msg, *args, **kwargs) | Delegates to the log_warning function. *args are used for %-style formatting. kwargs are discarded. |
error(msg, *args, **kwargs) | Delegates to the log_error function. *args are used for %-style formatting. kwargs are discarded. |
log_critical(msg, *args, **kwargs) | Delegates to the log_critical function. *args are used for %-style formatting. kwargs are discarded. |
Usage Example:
try:
import gwa
import gwa.logging as logging
get_timestamp = gwa.get_timestamp
except ImportError:
import logging as logging
import time
get_timestamp = lambda: round(time.time() * 1000)
def function(trigger, topic, data):
logging.debug('Scheduled execution at: %d', get_timestamp())
logging.info('Scheduled execution at: %d', get_timestamp() + 1)
logging.warning('Scheduled execution at: %d', get_timestamp() + 2)
logging.error('Scheduled execution at: %d', get_timestamp() + 3)
logging.critical('Scheduled execution at: %d', get_timestamp() + 4)
if __name__ == "__main__":
# Not executed by GWA Analytics, only runs when invoked directly through python interpreter
logging.getLogger().setLevel(logging.DEBUG)
function(None, None, None)
Enabling Python Virtual Environments
The GWA Analytics component supports creating Python virtual environments.
If you specify the venv_path
setting in the application configuration file, the embedded interpreter uses venv to run the Python code.
In the standard gwa-analytics deb package, venv is located at /var/lib/relayr/gwa-analytics/gwa-analytics-venv/
, but you can create a different virtual environment and configure a venv_path
to it.
Only the
relayr
user can access that directory, so you need to switch users to install packages, e.g.:sudo -u relayr /var/lib/relayr/gwa-analytics/gwa-analytics-venv/bin/pip3 install numpy
.
Cron Expressions
A cron expression is one of the trigger types that can execute a script. To use it, include the on-tick
trigger in the script configuration.
A cron expression is a string that consists of 6 fields separated by white spaces. All fields are mandatory.
For each field, there is a set of allowed values and special characters. The table below presents values allowed for each field. For a description special characters, see the Allowed Special Characters section.
For examples of expressions, see the Examples section.
Here is an overview of the fields and their allowed values:
Field Name | Allowed Values | Allowed Special Characters |
---|---|---|
Seconds | 0-59 | , - * / |
Minutes | 0-59 | , - * / |
Hours | 0-23 | , - * / |
Days of the month | 1-31 | , - * ? / |
Month | 1-12 or JAN-DEC | , - * / |
Days of the week | 0-7 or MON-SUN (0 and 7 mean Sunday) | , - * ? / |
The names of months and days of the week are not case sensitive, e.g. MON and mon mean the same.
Allowed Special Characters
The allowed special characters have the following meaning:
Character | Meaning | Examples |
---|---|---|
* | All possible values | * in the minute field means every minute. |
? | No specific value | To execute a rule on a particular day of the month (e.g.: the 12th), but it does not matter what day of the week it is, enter 12 in the day of month field, and ? in the day of week field. If you specify days of a week, put ? in the day of month field. |
- | Range of values | 9-11 in the hour field means the hours 9, 10 and 11. |
, | Additional values | MON,WED,FRI in the day of week field means the days Monday, Wednesday, and Friday. |
/ | Increments of values | 0/15 in the seconds field means the seconds 0, 15, 30, and 45. 5/15 in the seconds field means the seconds 5, 20, 35, and 50. |
Examples
Here are some examples of schedules that trigger rules:
Expression | Meaning |
---|---|
0/5 * * * * * | Every 5 seconds |
0 0/5 * * * * | Every 5 minutes |
0 0 8 * * ? | Every day at 8 a.m. |
0 0 6,22 ? * MON-FRI | At 6 a.m. and 10 p.m. from Monday to Friday |
0 10 7 4 * ? | At 7:10 a.m. on the fourth day of every month |
0 0 22 1/3 * ? | At 10 p.m. every three days every month, starting on the first day of the month |
0 20 6 ? JUN-SEP 1 | At 6:20 a.m. on Monday, from June to September |
Examples of Configuration Files
Deployment Configuration File
{
"mqtt_connection": {
"host": "127.0.0.1",
"port": 1883,
"keepalive": 20,
"max_inflight_messages": 100,
"qos_levels": {
"measurements": 0,
"alerts": 1,
"configuration": 1,
"metadata": 1
}
},
"health_monitor": {
"heartbeat_interval": 60
},
"pmq_connection":
{
"input_queues": ["/gwa-analytics"],
"output_queues" : [],
"message_size": 8192,
"message_count": 10
},
"rest": {
"host": "127.0.0.1",
"port": 27080
}
}
Application Configuration File
{
"grpc_server_address": "unix:///tmp/gwa-analytics.sock",
"module_path": "/usr/share/relayr/gwa-analytics/modules",
"subprocess_executable_path": "/usr/bin/gwa-analytics-python-subprocess",
"workspace_path": "/var/cache/relayr/gwa-analytics/workspace",
"scripts":
{
"script4":
{
"module_name": "module_1",
"triggers": {
"on-mqtt-publish": ["mqtt_topic1"],
"on-pmq-publish": ["pmq_topic1"],
"on-tick": ["*/5 * * * * *"]
},
"log_level": "info",
"module_path": "modules1"
},
"unique-script-name-2":
{
"module_name": "module_2",
"triggers": {
"on-mqtt-publish": ["mqtt_topic2"]
}
}
},
"cached-topics":
{
"mqtt": ["test_mqtt_topic"],
"pmq": ["test_pmq_topic"]
}
}