1 | |
2 | * General Trigger Preprocessor |
3 | * |
4 | * ⚠️ This function runs BEFORE the main function. |
5 | * |
6 | * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`. |
7 | * Common tasks: |
8 | * - Convert binary payloads to string/JSON |
9 | * - Extract metadata |
10 | * - Filter messages |
11 | * - Add timestamps/context |
12 | * |
13 | * The returned object determines `main()` parameters: |
14 | * - `{a: 1, b: 2}` → `main(a, b)` |
15 | * - `{payload}` → `main(payload)` |
16 | * |
17 | * @param wm_trigger - Trigger details (e.g., MQTT, HTTP) |
18 | * @param payload - Raw trigger data (format varies by trigger type) |
19 | * @returns Processed data for `main()` |
20 | */ |
21 | export async function preprocessor( |
22 | wm_trigger: { |
23 | kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp', |
24 | gcp?: { |
25 | attributes?: Record<string, string>, |
26 | message_id: string, |
27 | subscription: string, |
28 | ordering_key?: string, |
29 | pull?: { |
30 | publish_time?: number, |
31 | }, |
32 | push?: { |
33 | headers: Record<string, string>, |
34 | publish_time: string, |
35 | } |
36 | } |
37 | }, |
38 | msg: string, |
39 | ) { |
40 | if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) { |
41 | const decodedString = atob(msg); |
42 | |
43 | const attributes = wm_trigger.gcp.attributes || {}; |
44 | const contentType = attributes['content-type'] || attributes['Content-Type']; |
45 | const isJson = contentType === 'application/json'; |
46 |
|
47 | let parsedMessage: any = decodedString; |
48 | if (isJson) { |
49 | try { |
50 | parsedMessage = JSON.parse(decodedString); |
51 | } catch (err) { |
52 | throw new Error(`Invalid JSON payload: ${err}`); |
53 | } |
54 | } |
55 | return { |
56 | messageAsDecodedString: decodedString, |
57 | contentType, |
58 | parsedMessage, |
59 | attributes |
60 | }; |
61 | } |
62 |
|
63 | |
64 | |
65 | throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`) |
66 | } |
67 |
|
68 | |
69 | * Main Function - Handles processed trigger events |
70 | * |
71 | * ⚠️ Called AFTER `preprocessor()`, with its return values. |
72 | * |
73 | * @param payload - Raw binary payload |
74 | * @param payloadAsString - Decoded string payload |
75 | * @param contentType - Content type from message attributes (e.g., application/json) |
76 | * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString |
77 | * @param attributes - Key-value metadata sent with the message |
78 | */ |
79 | export async function main( |
80 | messageAsDecodedString: string, |
81 | contentType?: string, |
82 | parsedMessage?: any, |
83 | attributes?: Record<string, string> |
84 | ) { |
85 |
|
86 | } |