Normalisation Daemon
The reporting normalisation daemon is a scheduled batch job that reads data from a source — either a SQL query or an HTTP endpoint — and writes each row into a destination table via a parameterised INSERT statement. It runs on a per-worker cron schedule and is the primary mechanism for populating aggregate and summary tables from live platform data.
How It Works
The daemon outer tick runs every minute. Each worker in the config carries its own cron expression; on each tick, the daemon evaluates which workers are due and streams their items in order. For each source row, it executes the configured INSERT against the destination database. Processing is throttled and committed in configurable batches.
Configuration
All workers are defined in a single JSON array stored in the reporting.normalisation.daemon.config platform property. Each element in the array is one worker.
Global settings
| Property | Default | Description |
|---|---|---|
reporting.normalisation.daemon.config | [] | JSON array of worker config objects (see below) |
reporting.normalisation.daemon.commitFrequency | 100 | Number of rows between JPA flush/commit cycles |
reporting.normalisation.daemon.maxTps | 100 | Maximum rows processed per second across all active workers in a single tick |
Worker Config Fields
Common fields (all types)
| Field | Required | Description |
|---|---|---|
cron | Yes | Standard 5-field cron expression (minute, hour, day, month, weekday). Evaluated in UTC. Controls how often this worker runs. |
type | Yes | Source type: SQL or HTTP |
insertSql | Yes | Parameterised SQL INSERT statement. Use ? placeholders for each value extracted from the source. Columns from the source map positionally to the ? placeholders. |
SQL worker
Reads rows from a native SQL SELECT. Each column in the result row maps positionally to a ? in insertSql.
| Field | Required | Description |
|---|---|---|
sourceSql | Yes | Native SQL SELECT query to run against the reporting database. Supports ${<sql>} tokens (see below). |
Example:
{
"cron": "*/5 * * * *",
"type": "SQL",
"sourceSql": "SELECT id, wallet_id, amount, created FROM payment WHERE processed = 0 AND id > ${SELECT COALESCE(MAX(source_id), 0) FROM payment_aggregate}",
"insertSql": "INSERT INTO payment_aggregate (source_id, wallet_id, amount, created) VALUES (?, ?, ?, ?)"
}HTTP worker
Calls an external endpoint that returns a JSON array. JSONPath expressions extract the insert parameters from each element in the array.
| Field | Required | Description |
|---|---|---|
url | Yes | Endpoint URL to call |
method | Yes | HTTP method: GET, POST, etc. |
body | No | Request body string. Supports ${<sql>} tokens. |
headers | No | JSON object of static request headers (key-value pairs) |
headerSupplier | No | Fully-qualified class name of an IHeaderSupplier implementation that provides dynamic headers (e.g. a signed JWT). Supplier headers are merged on top of any static headers — supplier wins on conflicts. |
jsonPaths | Yes | JSON array of JSONPath expressions. Each expression extracts one value from a result element; values map positionally to the ? placeholders in insertSql. |
Example:
{
"cron": "0 * * * *",
"type": "HTTP",
"url": "https://partner-api.example.com/transactions",
"method": "POST",
"body": "{\"since\": \"${SELECT COALESCE(MAX(created_at), '1970-01-01') FROM partner_transactions}\"}",
"headers": {
"X-Api-Version": "2"
},
"headerSupplier": "com.ukheshe.services.reporting.daemon.PnGatewayHeaderSupplier",
"jsonPaths": ["$.id", "$.amount", "$.currency", "$.createdAt"],
"insertSql": "INSERT INTO partner_transactions (external_id, amount, currency, created_at) VALUES (?, ?, ?, ?)"
}SQL Tokens
Both sourceSql (SQL type) and body (HTTP type) support ${<sql>} tokens. Before the source is queried or the HTTP request is sent, each token is replaced with the first column of the first row returned by running the embedded SQL against the destination database.
This is the standard pattern for watermarking — only pulling records the destination has not yet seen.
Examples:
-- Pull only new rows since the last inserted ID
SELECT id, amount FROM payments WHERE id > ${SELECT COALESCE(MAX(source_id), 0) FROM payment_aggregate}
-- Pull rows created after the latest record in the aggregate table
SELECT * FROM transactions WHERE created > ${SELECT COALESCE(MAX(source_created), '2000-01-01') FROM transaction_summary}If the token query returns no rows, the token resolves to an empty string.
Postilion Gateway Header Supplier
The built-in PnGatewayHeaderSupplier generates a JWT Authorization header for authenticating against the Postilion gateway. The token is cached for 50 seconds.
| Property | Description |
|---|---|
reporting.normalisation.pn.gateway.userId | User ID used to generate the system JWT for Postilion gateway calls |
To use it, set headerSupplier to com.ukheshe.services.reporting.daemon.PnGatewayHeaderSupplier in the worker config.
Full Example Config
The following property value defines two workers: one that polls a local SQL table every 5 minutes and one that calls an external API every hour.
Property: reporting.normalisation.daemon.config
[
{
"cron": "*/5 * * * *",
"type": "SQL",
"sourceSql": "SELECT wallet_id, SUM(amount), COUNT(*), MAX(created) FROM payment WHERE tenant_id = 101 AND created >= ${SELECT COALESCE(MAX(last_seen), '2000-01-01') FROM wallet_daily_summary WHERE tenant_id = 101} GROUP BY wallet_id",
"insertSql": "INSERT INTO wallet_daily_summary (wallet_id, total_amount, tx_count, last_seen, tenant_id) VALUES (?, ?, ?, ?, 101)"
},
{
"cron": "0 2 * * *",
"type": "HTTP",
"url": "https://partner.example.com/api/settlements",
"method": "GET",
"headerSupplier": "com.ukheshe.services.reporting.daemon.PnGatewayHeaderSupplier",
"jsonPaths": ["$.settlementId", "$.amount", "$.settledAt"],
"insertSql": "INSERT INTO settlement_aggregate (external_id, amount, settled_at) VALUES (?, ?, ?)"
}
]
NoteThe daemon writes to the same database the reporting service uses. Ensure the
INSERTtarget tables exist before activating a worker. The daemon does not create tables.
Updated 3 days ago
