GCP Pub/Sub script with preprocessor template

Script windmill Verified

by dieriba.pro916 · 4/3/2025

The script

Submitted by dieriba.pro916 Bun
Verified 277 days ago
1
/**
2
 * General Trigger Preprocessor
3
 * 
4
 * ⚠️ This function runs BEFORE the main function.
5
 * 
6
 * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
7
 * Common tasks:
8
 * - Convert binary payloads to string/JSON
9
 * - Extract metadata
10
 * - Filter messages
11
 * - Add timestamps/context
12
 * 
13
 * The returned object determines `main()` parameters:
14
 * - `{a: 1, b: 2}` → `main(a, b)`
15
 * - `{payload}` → `main(payload)`
16
 * 
17
 * @param event - Trigger data and details (e.g., MQTT, HTTP)
18
 * @returns Processed data for `main()`
19
 */
20
export async function preprocessor(
21
  event: {
22
    payload: string,
23
    kind: 'gcp',
24
    message_id: string,
25
    subscription: string,
26
    ordering_key?: string,
27
    attributes?: Record<string, string>,
28
    delivery_type: "push" | "pull",
29
    headers?: Record<string, string>,
30
    publish_time?: string,
31
    ack_id?: string // acknowledgment ID for manual acknowledgment (only for pull delivery)
32
  },
33
) {
34
  if (event.kind === 'gcp') {
35
    const decodedString = atob(event.payload); // Direct decode, assume valid base64
36

37
    const attributes = event.attributes || {};
38
    const contentType = attributes['content-type'] || attributes['Content-Type'];
39
    const isJson = contentType === 'application/json';
40

41
    let parsedMessage: any = decodedString;
42

43
    if (isJson) {
44
      try {
45
        parsedMessage = JSON.parse(decodedString);
46
      } catch (err) {
47
        throw new Error(`Invalid JSON payload: ${err}`);
48
      }
49
    }
50

51
    return {
52
      messageAsDecodedString: decodedString,
53
      contentType,
54
      parsedMessage,
55
      attributes,
56
      ack_id: event.ack_id
57
    };
58
  }
59

60
  // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
61
  // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
62
  throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
63
}
64

65
/**
66
 * Main Function - Handles processed trigger events
67
 * 
68
 * ⚠️ Called AFTER `preprocessor()`, with its return values.
69
 * 
70
 * @param payload - Raw base64 encoded payload
71
 * @param payloadAsString - Decoded string payload
72
 * @param contentType - Content type from message attributes (e.g., application/json)
73
 * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
74
 * @param attributes - Key-value metadata sent with the message
75
 * @param ack_id - Acknowledgment ID for manual acknowledgment (only available for pull delivery, undefined for push)
76
 */
77
export async function main(
78
  messageAsDecodedString: string,
79
  contentType?: string,
80
  parsedMessage?: any,
81
  attributes?: Record<string, string>,
82
  ack_id?: string
83
) {
84

85
}