1 | |
2 | * General Trigger Preprocessor |
3 | * |
4 | * ⚠️ This function runs BEFORE the main function. |
5 | * |
6 | * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`. |
7 | * Common tasks: |
8 | * - Convert binary payloads to string/JSON |
9 | * - Extract metadata |
10 | * - Filter messages |
11 | * - Add timestamps/context |
12 | * |
13 | * The returned object determines `main()` parameters: |
14 | * - `{a: 1, b: 2}` → `main(a, b)` |
15 | * - `{payload}` → `main(payload)` |
16 | * |
17 | * @param event - Trigger data and details (e.g., MQTT, HTTP) |
18 | * @returns Processed data for `main()` |
19 | */ |
20 | export async function preprocessor( |
21 | event: { |
22 | payload: string, |
23 | kind: 'gcp', |
24 | message_id: string, |
25 | subscription: string, |
26 | ordering_key?: string, |
27 | attributes?: Record<string, string>, |
28 | delivery_type: "push" | "pull", |
29 | headers?: Record<string, string>, |
30 | publish_time?: string, |
31 | ack_id?: string // acknowledgment ID for manual acknowledgment (only for pull delivery) |
32 | }, |
33 | ) { |
34 | if (event.kind === 'gcp') { |
35 | const decodedString = atob(event.payload); |
36 |
|
37 | const attributes = event.attributes || {}; |
38 | const contentType = attributes['content-type'] || attributes['Content-Type']; |
39 | const isJson = contentType === 'application/json'; |
40 |
|
41 | let parsedMessage: any = decodedString; |
42 |
|
43 | if (isJson) { |
44 | try { |
45 | parsedMessage = JSON.parse(decodedString); |
46 | } catch (err) { |
47 | throw new Error(`Invalid JSON payload: ${err}`); |
48 | } |
49 | } |
50 |
|
51 | return { |
52 | messageAsDecodedString: decodedString, |
53 | contentType, |
54 | parsedMessage, |
55 | attributes, |
56 | ack_id: event.ack_id |
57 | }; |
58 | } |
59 |
|
60 | |
61 | |
62 | throw new Error(`Expected gcp trigger kind got: ${event.kind}`) |
63 | } |
64 |
|
65 | |
66 | * Main Function - Handles processed trigger events |
67 | * |
68 | * ⚠️ Called AFTER `preprocessor()`, with its return values. |
69 | * |
70 | * @param payload - Raw base64 encoded payload |
71 | * @param payloadAsString - Decoded string payload |
72 | * @param contentType - Content type from message attributes (e.g., application/json) |
73 | * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString |
74 | * @param attributes - Key-value metadata sent with the message |
75 | * @param ack_id - Acknowledgment ID for manual acknowledgment (only available for pull delivery, undefined for push) |
76 | */ |
77 | export async function main( |
78 | messageAsDecodedString: string, |
79 | contentType?: string, |
80 | parsedMessage?: any, |
81 | attributes?: Record<string, string>, |
82 | ack_id?: string |
83 | ) { |
84 |
|
85 | } |