GCP Pub/Sub trigger script template

Script windmill

by dieriba.pro916 · 4/6/2025

  • Submitted by dieriba.pro916 Bun
    Created 401 days ago
    1
    /**
    2
     * General Trigger Preprocessor
    3
     * 
    4
     * ⚠️ This function runs BEFORE the main function.
    5
     * 
    6
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
    7
     * Common tasks:
    8
     * - Convert binary payloads to string/JSON
    9
     * - Extract metadata
    10
     * - Filter messages
    11
     * - Add timestamps/context
    12
     * 
    13
     * The returned object determines `main()` parameters:
    14
     * - `{a: 1, b: 2}` → `main(a, b)`
    15
     * - `{payload}` → `main(payload)`
    16
     * 
    17
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
    18
     * @param payload - Raw trigger data (format varies by trigger type)
    19
     * @returns Processed data for `main()`
    20
     */
    21
    export async function preprocessor(
    22
      wm_trigger: {
    23
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
    24
        gcp?: {
    25
          attributes?: Record<string, string>,
    26
          message_id: string,
    27
          subscription: string,
    28
          ordering_key?: string,
    29
          pull?: {
    30
            publish_time?: number,
    31
          },
    32
          push?: {
    33
            headers: Record<string, string>,
    34
            publish_time: string,
    35
          }
    36
        }
    37
      },
    38
      msg: string,
    39
    ) {
    40
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
    41
        const decodedString = atob(msg); // Direct decode, assume valid base64
    42
        
    43
        const attributes = wm_trigger.gcp.attributes || {};
    44
        const contentType = attributes['content-type'] || attributes['Content-Type'];
    45
        const isJson = contentType === 'application/json';
    46
    
    
    47
        let parsedMessage: any = decodedString;
    48
        if (isJson) {
    49
          try {
    50
            parsedMessage = JSON.parse(decodedString);
    51
          } catch (err) {
    52
            throw new Error(`Invalid JSON payload: ${err}`);
    53
          }
    54
        }
    55
        return {
    56
          messageAsDecodedString: decodedString,
    57
          contentType,
    58
          parsedMessage,
    59
          attributes
    60
        };
    61
      }
    62
    
    
    63
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
    64
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
    65
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    66
    }
    67
    
    
    68
    /**
    69
     * Main Function - Handles processed trigger events
    70
     * 
    71
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
    72
     * 
    73
     * @param payload - Raw binary payload
    74
     * @param payloadAsString - Decoded string payload
    75
     * @param contentType - Content type from message attributes (e.g., application/json)
    76
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
    77
     * @param attributes - Key-value metadata sent with the message
    78
     */
    79
    export async function main(
    80
      messageAsDecodedString: string,
    81
      contentType?: string,
    82
      parsedMessage?: any,
    83
      attributes?: Record<string, string>
    84
    ) {
    85
    
    
    86
    }