Normalisation Daemon

The reporting normalisation daemon is a scheduled batch job that reads data from a source — either a SQL query or an HTTP endpoint — and writes each row into a destination table via a parameterised INSERT statement. It runs on a per-worker cron schedule and is the primary mechanism for populating aggregate and summary tables from live platform data.

How It Works

The daemon outer tick runs every minute. Each worker in the config carries its own cron expression; on each tick, the daemon evaluates which workers are due and streams their items in order. For each source row, it executes the configured INSERT against the destination database. Processing is throttled and committed in configurable batches.


Configuration

All workers are defined in a single JSON array stored in the reporting.normalisation.daemon.config platform property. Each element in the array is one worker.

Global settings

PropertyDefaultDescription
reporting.normalisation.daemon.config[]JSON array of worker config objects (see below)
reporting.normalisation.daemon.commitFrequency100Number of rows between JPA flush/commit cycles
reporting.normalisation.daemon.maxTps100Maximum rows processed per second across all active workers in a single tick

Worker Config Fields

Common fields (all types)

FieldRequiredDescription
cronYesStandard 5-field cron expression (minute, hour, day, month, weekday). Evaluated in UTC. Controls how often this worker runs.
typeYesSource type: SQL or HTTP
insertSqlYesParameterised SQL INSERT statement. Use ? placeholders for each value extracted from the source. Columns from the source map positionally to the ? placeholders.

SQL worker

Reads rows from a native SQL SELECT. Each column in the result row maps positionally to a ? in insertSql.

FieldRequiredDescription
sourceSqlYesNative SQL SELECT query to run against the reporting database. Supports ${<sql>} tokens (see below).

Example:

{
  "cron": "*/5 * * * *",
  "type": "SQL",
  "sourceSql": "SELECT id, wallet_id, amount, created FROM payment WHERE processed = 0 AND id > ${SELECT COALESCE(MAX(source_id), 0) FROM payment_aggregate}",
  "insertSql": "INSERT INTO payment_aggregate (source_id, wallet_id, amount, created) VALUES (?, ?, ?, ?)"
}

HTTP worker

Calls an external endpoint that returns a JSON array. JSONPath expressions extract the insert parameters from each element in the array.

FieldRequiredDescription
urlYesEndpoint URL to call
methodYesHTTP method: GET, POST, etc.
bodyNoRequest body string. Supports ${<sql>} tokens.
headersNoJSON object of static request headers (key-value pairs)
headerSupplierNoFully-qualified class name of an IHeaderSupplier implementation that provides dynamic headers (e.g. a signed JWT). Supplier headers are merged on top of any static headers — supplier wins on conflicts.
jsonPathsYesJSON array of JSONPath expressions. Each expression extracts one value from a result element; values map positionally to the ? placeholders in insertSql.

Example:

{
  "cron": "0 * * * *",
  "type": "HTTP",
  "url": "https://partner-api.example.com/transactions",
  "method": "POST",
  "body": "{\"since\": \"${SELECT COALESCE(MAX(created_at), '1970-01-01') FROM partner_transactions}\"}",
  "headers": {
    "X-Api-Version": "2"
  },
  "headerSupplier": "com.ukheshe.services.reporting.daemon.PnGatewayHeaderSupplier",
  "jsonPaths": ["$.id", "$.amount", "$.currency", "$.createdAt"],
  "insertSql": "INSERT INTO partner_transactions (external_id, amount, currency, created_at) VALUES (?, ?, ?, ?)"
}

SQL Tokens

Both sourceSql (SQL type) and body (HTTP type) support ${<sql>} tokens. Before the source is queried or the HTTP request is sent, each token is replaced with the first column of the first row returned by running the embedded SQL against the destination database.

This is the standard pattern for watermarking — only pulling records the destination has not yet seen.

Examples:

-- Pull only new rows since the last inserted ID
SELECT id, amount FROM payments WHERE id > ${SELECT COALESCE(MAX(source_id), 0) FROM payment_aggregate}

-- Pull rows created after the latest record in the aggregate table
SELECT * FROM transactions WHERE created > ${SELECT COALESCE(MAX(source_created), '2000-01-01') FROM transaction_summary}

If the token query returns no rows, the token resolves to an empty string.


Postilion Gateway Header Supplier

The built-in PnGatewayHeaderSupplier generates a JWT Authorization header for authenticating against the Postilion gateway. The token is cached for 50 seconds.

PropertyDescription
reporting.normalisation.pn.gateway.userIdUser ID used to generate the system JWT for Postilion gateway calls

To use it, set headerSupplier to com.ukheshe.services.reporting.daemon.PnGatewayHeaderSupplier in the worker config.


Full Example Config

The following property value defines two workers: one that polls a local SQL table every 5 minutes and one that calls an external API every hour.

Property: reporting.normalisation.daemon.config

[
  {
    "cron": "*/5 * * * *",
    "type": "SQL",
    "sourceSql": "SELECT wallet_id, SUM(amount), COUNT(*), MAX(created) FROM payment WHERE tenant_id = 101 AND created >= ${SELECT COALESCE(MAX(last_seen), '2000-01-01') FROM wallet_daily_summary WHERE tenant_id = 101} GROUP BY wallet_id",
    "insertSql": "INSERT INTO wallet_daily_summary (wallet_id, total_amount, tx_count, last_seen, tenant_id) VALUES (?, ?, ?, ?, 101)"
  },
  {
    "cron": "0 2 * * *",
    "type": "HTTP",
    "url": "https://partner.example.com/api/settlements",
    "method": "GET",
    "headerSupplier": "com.ukheshe.services.reporting.daemon.PnGatewayHeaderSupplier",
    "jsonPaths": ["$.settlementId", "$.amount", "$.settledAt"],
    "insertSql": "INSERT INTO settlement_aggregate (external_id, amount, settled_at) VALUES (?, ?, ?)"
  }
]
📘

Note

The daemon writes to the same database the reporting service uses. Ensure the INSERT target tables exist before activating a worker. The daemon does not create tables.