1 | |
2 | * General Trigger Preprocessor |
3 | * |
4 | * ⚠️ This function runs BEFORE the main function. |
5 | * |
6 | * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`. |
7 | * Common tasks: |
8 | * - Convert binary payloads to string/JSON |
9 | * - Extract metadata |
10 | * - Filter messages |
11 | * - Add timestamps/context |
12 | * |
13 | * The returned object determines `main()` parameters: |
14 | * - `{a: 1, b: 2}` → `main(a, b)` |
15 | * - `{payload}` → `main(payload)` |
16 | * |
17 | * @param wm_trigger - Trigger details (e.g., MQTT, HTTP) |
18 | * @param payload - Raw trigger data (format varies by trigger type) |
19 | * @returns Processed data for `main()` |
20 | */ |
21 | export async function preprocessor( |
22 | wm_trigger: { |
23 | kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt', |
24 | mqtt?: { |
25 | topic: string, |
26 | retain: boolean, |
27 | pkid: number, |
28 | qos: number, |
29 | v5?: { |
30 | payload_format_indicator?: number, |
31 | topic_alias?: number, |
32 | response_topic?: string, |
33 | correlation_data?: Array<number>, |
34 | user_properties?: Array<[string, string]>, |
35 | subscription_identifiers?: Array<number>, |
36 | content_type?: string |
37 | } |
38 | } |
39 | }, |
40 | payload: Array<number>, |
41 | ) { |
42 | if (wm_trigger.kind === 'mqtt' && wm_trigger.mqtt) { |
43 | const uint8Payload = new Uint8Array(payload); |
44 | const payloadAsString = new TextDecoder().decode(uint8Payload); |
45 |
|
46 | return { |
47 | contentType: wm_trigger.mqtt.v5?.content_type, |
48 | payload: uint8Payload, |
49 | payloadAsString |
50 | }; |
51 | } |
52 | return { kind: wm_trigger.kind }; |
53 | } |
54 |
|
55 | |
56 | * Main Function - Handles processed trigger events |
57 | * |
58 | * ⚠️ Called AFTER `preprocessor()`, with its return values. |
59 | * |
60 | * @param payload - Raw binary payload |
61 | * @param payloadAsString - Decoded string payload |
62 | * @param contentType - MQTT v5 content type (if available) |
63 | */ |
64 | export async function main(payload: Uint8Array, payloadAsString: string, contentType?: string) { |
65 |
|
66 | } |