Edits history of script submission #11476 for ' GCP Pub/Sub script with preprocessor template (windmill)'

  • bun
    One script reply has been approved by the moderators
    Ap­pro­ved
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param event - Trigger data and details (e.g., MQTT, HTTP)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      event: {
        payload: string,
        kind: 'gcp',
        message_id: string,
        subscription: string,
        ordering_key?: string,
        attributes?: Record<string, string>,
        delivery_type: "push" | "pull",
        headers?: Record<string, string>,
        publish_time?: string,
        ack_id?: string // acknowledgment ID for manual acknowledgment (only for pull delivery)
      },
    ) {
      if (event.kind === 'gcp') {
        const decodedString = atob(event.payload); // Direct decode, assume valid base64
    
        const attributes = event.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes,
          ack_id: event.ack_id
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw base64 encoded payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     * @param ack_id - Acknowledgment ID for manual acknowledgment (only available for pull delivery, undefined for push)
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>,
      ack_id?: string
    ) {
    
    }

    Submitted by dieriba.pro916 277 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param event - Trigger data and details (e.g., MQTT, HTTP)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      event: {
        payload: string,
        kind: 'gcp',
        message_id: string,
        subscription: string,
        ordering_key?: string,
        attributes?: Record<string, string>,
        delivery_type: "push" | "pull",
        headers?: Record<string, string>,
        publish_time?: string,
      },
    ) {
      if (event.kind === 'gcp') {
        const decodedString = atob(event.payload); // Direct decode, assume valid base64
    
        const attributes = event.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw base64 encoded payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by hugo697 369 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param event - Trigger data and details (e.g., MQTT, HTTP)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      event: {
        payload: string,
        kind: 'gcp',
        message_id: string,
        subscription: string,
        ordering_key?: string,
        attributes?: Record<string, string>,
        delivery_type: "push" | "pull",
        headers?: Record<string, string>,
        publish_time?: string,
      },
    ) {
      if (event.kind === 'gcp') {
        const decodedString = atob(event.payload); // Direct decode, assume valid base64
    
        const attributes = event.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw base64 encoded payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by hugo697 369 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param event - Trigger data and details (e.g., MQTT, HTTP)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      event: {
        payload: string,
        kind: 'gcp',
        message_id: string,
        subscription: string,
        ordering_key?: string,
        attributes?: Record<string, string>,
        delivery_type: "push" | "pull",
        headers?: Record<string, string>,
        publish_time?: string,
      },
    ) {
      if (event.kind === 'gcp') {
        const decodedString = atob(event.payload); // Direct decode, assume valid base64
    
        const attributes = event.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by hugo697 369 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param event - Trigger data and details (e.g., MQTT, HTTP)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      event: {
        payload: string,
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        message_id: string,
        subscription: string,
        ordering_key?: string,
        attributes?: Record<string, string>,
        delivery_type: "push" | "pull",
        headers?: Record<string, string>,
        publish_time?: string,
      },
    ) {
      if (event.kind === 'gcp') {
        const decodedString = atob(event.payload); // Direct decode, assume valid base64
    
        const attributes = event.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by hugo697 369 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
     * @param payload - Raw trigger data (format varies by trigger type)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          message_id: string,
          subscription: string,
          ordering_key?: string,
          attributes?: Record<string, string>,
          delivery_type: "push" | "pull",
          headers?: Record<string, string>,
          publish_time?: string,
        }
      },
      payload: string,
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const decodedString = atob(payload); // Direct decode, assume valid base64
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by dieriba.pro916 397 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
     * @param payload - Raw trigger data (format varies by trigger type)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          message_id: string,
          subscription: string,
          ordering_key?: string,
          attributes?: Record<string, string>,
          delivery_type: "push" | "pull",
          headers?: Record<string, string>,
          publish_time?: string,
        }
      },
      msg: string,
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const decodedString = atob(msg); // Direct decode, assume valid base64
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by dieriba.pro916 399 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
     * @param payload - Raw trigger data (format varies by trigger type)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          attributes?: Record<string, string>,
          message_id: string,
          subscription: string,
          ordering_key?: string,
          pull?: {
            publish_time?: number,
          },
          push?: {
            headers: Record<string, string>,
            publish_time: string,
          }
        }
      },
      msg: string,
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const decodedString = atob(msg); // Direct decode, assume valid base64
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedMessage: any = decodedString;
    
        if (isJson) {
          try {
            parsedMessage = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          messageAsDecodedString: decodedString,
          contentType,
          parsedMessage,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      messageAsDecodedString: string,
      contentType?: string,
      parsedMessage?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by dieriba.pro916 401 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
     * @param payload - Raw trigger data (format varies by trigger type)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          attributes?: Record<string, string>,
          message_id: string,
          subscription: string,
          ordering_key?: string,
          pull?: {
            publish_time?: number,
          },
          push?: {
            headers: Record<string, string>,
            publish_time: string,
          }
        }
      },
      payload: string,
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const decodedString = atob(payload); // Direct decode, assume valid base64
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedPayload: any = decodedString;
    
        if (isJson) {
          try {
            parsedPayload = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          payloadAsDecodedString: decodedString,
          contentType,
          parsedPayload,
          attributes
        };
      }
    
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      payloadAsDecodedString: string,
      contentType?: string,
      parsedPayload?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by dieriba.pro916 401 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * Assumes payload is valid standard base64 (Pub/Sub guarantee).
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          attributes?: Record<string, string>,
          message_id: string,
          subscription: string,
          ordering_key?: string,
          pull?: {
            publish_time?: number,
          },
          push?: {
            headers: Record<string, string>,
            publish_time: string,
          }
        }
      },
      payload: string, // base64 encoded string
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const decodedString = atob(payload); // Direct decode, assume valid base64
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedPayload: any = decodedString;
    
        if (isJson) {
          try {
            parsedPayload = JSON.parse(decodedString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          payloadAsString: decodedString,
          contentType,
          parsedPayload,
          attributes
        };
      }
    
      throw new Error(`Expected gcp trigger kind, got: ${wm_trigger.kind}`);
    }

    Submitted by dieriba.pro916 401 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
     * @param payload - Raw trigger data (format varies by trigger type)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          attributes?: Record<string, string>,
          message_id: string,
          subscription: string,
          ordering_key?: string,
          pull?: {
            publish_time?: number,
          },
          push?: {
            headers: Record<string, string>,
            publish_time: string,
          }
        }
      },
      payload: Array<number>,
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const uint8Payload = new Uint8Array(payload);
        const payloadAsString = new TextDecoder().decode(uint8Payload);
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedPayload: any = payloadAsString;
    
        if (isJson) {
          try {
            parsedPayload = JSON.parse(payloadAsString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          payload: uint8Payload,
          payloadAsString,
          contentType,
          parsedPayload,
          attributes
        };
      }
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      payload: Uint8Array,
      payloadAsString: string,
      contentType?: string,
      parsedPayload?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by dieriba.pro916 404 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
     * @param payload - Raw trigger data (format varies by trigger type)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          attributes?: Record<string, string>,
          message_id: string,
          pull?: {
            publish_time?: number,
            ordering_key?: string,
          },
          push?: {
            headers: Record<string, string>,
            publish_time: string,
            subscription: string
          }
        }
      },
      payload: Array<number>,
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const uint8Payload = new Uint8Array(payload);
        const payloadAsString = new TextDecoder().decode(uint8Payload);
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedPayload: any = payloadAsString;
    
        if (isJson) {
          try {
            parsedPayload = JSON.parse(payloadAsString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          payload: uint8Payload,
          payloadAsString,
          contentType,
          parsedPayload,
          attributes
        };
      }
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      payload: Uint8Array,
      payloadAsString: string,
      contentType?: string,
      parsedPayload?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by dieriba.pro916 404 days ago

  • bun
    /**
     * General Trigger Preprocessor
     * 
     * ⚠️ This function runs BEFORE the main function.
     * 
     * It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
     * Common tasks:
     * - Convert binary payloads to string/JSON
     * - Extract metadata
     * - Filter messages
     * - Add timestamps/context
     * 
     * The returned object determines `main()` parameters:
     * - `{a: 1, b: 2}` → `main(a, b)`
     * - `{payload}` → `main(payload)`
     * 
     * @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
     * @param payload - Raw trigger data (format varies by trigger type)
     * @returns Processed data for `main()`
     */
    export async function preprocessor(
      wm_trigger: {
        kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
        gcp?: {
          attributes?: Record<string, string>,
          message_id: string,
          publish_time?: number,
          ordering_key?: string,
          push?: {
            headers: Record<string, string>,
            publish_time: string
          }
        }
      },
      payload: Array<number>,
    ) {
      if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
        const uint8Payload = new Uint8Array(payload);
        const payloadAsString = new TextDecoder().decode(uint8Payload);
    
        const attributes = wm_trigger.gcp.attributes || {};
        const contentType = attributes['content-type'] || attributes['Content-Type'];
        const isJson = contentType === 'application/json';
    
        let parsedPayload: any = payloadAsString;
    
        if (isJson) {
          try {
            parsedPayload = JSON.parse(payloadAsString);
          } catch (err) {
            throw new Error(`Invalid JSON payload: ${err}`);
          }
        }
    
        return {
          payload: uint8Payload,
          payloadAsString,
          contentType,
          parsedPayload,
          attributes
        };
      }
      // We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
      // If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
      throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
    }
    
    /**
     * Main Function - Handles processed trigger events
     * 
     * ⚠️ Called AFTER `preprocessor()`, with its return values.
     * 
     * @param payload - Raw binary payload
     * @param payloadAsString - Decoded string payload
     * @param contentType - Content type from message attributes (e.g., application/json)
     * @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
     * @param attributes - Key-value metadata sent with the message
     */
    export async function main(
      payload: Uint8Array,
      payloadAsString: string,
      contentType?: string,
      parsedPayload?: any,
      attributes?: Record<string, string>
    ) {
    
    }

    Submitted by dieriba.pro916 404 days ago