Kafka trigger script with preprocessor template

Script windmill Verified

by dieriba.pro916 · 3/3/2025

The script

Submitted by dieriba.pro916 Bun
Verified 369 days ago
1
/**
2
 * General Trigger Preprocessor
3
 *
4
 * ⚠️ This function runs BEFORE the main function.
5
 *
6
 * It processes raw trigger data (e.g., MQTT, HTTP, SQS, Kafka) before passing it to main().
7
 * Common tasks:
8
 * - Convert binary payloads to string/JSON
9
 * - Extract metadata
10
 * - Filter messages
11
 * - Add timestamps/context
12
 *
13
 * The returned object determines main() parameters:
14
 * - {a: 1, b: 2} → main(a, b)
15
 * - {msg} → main(msg)
16
 *
17
 * @param event - Trigger data (e.g., MQTT, HTTP, SQS, Kafka)
18
 * @returns Processed data for main()
19
 */
20
export async function preprocessor(
21
  event: {
22
    kind: 'kafka',
23
    payload: string, // base64 encoded payload
24
    brokers: string[],
25
    topic: string,
26
    group_id: string
27
  },
28
) {
29
  if (event.kind === 'kafka') {
30
    try {
31
      // Assuming the message received is a JSON value
32
      const msg = atob(event.payload);
33
      const data = JSON.parse(msg);
34

35
      return {
36
        msg,
37
        data,
38
      };
39
    } catch (error) {
40
      throw new Error("Failed to parse Kafka message as JSON");
41
    }
42
  }
43
  
44
  throw new Error(`Expected kafka trigger kind, got: ${event.kind}`);
45
}
46

47
/**
48
 * Main Function - Handles processed trigger events
49
 *
50
 * ⚠️ Called AFTER preprocessor(), with its return values.
51
 *
52
 * @param msg - Decoded message content
53
 * @param data - Parsed JSON data from the Kafka message
54
 */
55
export async function main(
56
  msg: string,
57
  data: any,
58
) {
59
  // Implement the main function logic here
60
}
61