1 | |
2 | * General Trigger Preprocessor |
3 | * |
4 | * ⚠️ This function runs BEFORE the main function. |
5 | * |
6 | * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main(). |
7 | * Common tasks: |
8 | * - Convert binary payloads to string/JSON |
9 | * - Extract metadata |
10 | * - Filter messages |
11 | * - Add timestamps/context |
12 | * |
13 | * The returned object determines main() parameters: |
14 | * - {a: 1, b: 2} → main(a, b) |
15 | * - {msg} → main(msg) |
16 | * |
17 | * @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS) |
18 | * @param msg - Raw trigger data (format varies by trigger type) |
19 | * @returns Processed data for main() |
20 | */ |
21 |
|
22 | export async function preprocessor( |
23 | wm_trigger: { |
24 | kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt', |
25 | sqs?: { |
26 | queue_url: string, |
27 | message_id?: string, |
28 | receipt_handle?: string, |
29 | attributes: Record<string, string>, |
30 | message_attributes?: Record<string, { |
31 | string_value?: string, |
32 | data_type: string |
33 | }> |
34 | } |
35 | }, |
36 | msg: string |
37 | ) { |
38 | if (wm_trigger.kind === 'sqs' && wm_trigger.sqs) { |
39 | return { |
40 | queueUrl: wm_trigger.sqs.queue_url, |
41 | messageId: wm_trigger.sqs.message_id, |
42 | receiptHandle: wm_trigger.sqs.receipt_handle, |
43 | attributes: wm_trigger.sqs.attributes, |
44 | messageAttributes: wm_trigger.sqs.message_attributes, |
45 | msg |
46 | }; |
47 | } |
48 | |
49 | throw new Error(`Expected sqs trigger kind, got: ${wm_trigger.kind}`); |
50 | } |
51 |
|
52 | |
53 | * Main Function - Handles processed trigger events |
54 | * |
55 | * ⚠️ Called AFTER preprocessor(), with its return values. |
56 | * |
57 | * @param queueUrl - URL of the SQS queue |
58 | * @param messageId - Unique message identifier (if available) |
59 | * @param receiptHandle - Receipt handle for message deletion (if available) |
60 | * @param attributes - Message attributes from SQS |
61 | * @param messageAttributes - Custom message attributes (if any) |
62 | * @param msg - Raw message content |
63 | */ |
64 | export async function main( |
65 | queueUrl: string, |
66 | messageId: string | undefined, |
67 | receiptHandle: string | undefined, |
68 | attributes: Record<string, string>, |
69 | messageAttributes: Record<string, { string_value?: string; data_type: string }> | undefined, |
70 | msg: string |
71 | ) { |
72 | |
73 | } |
74 |
|