/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
attributes?: Record<string, string>,
message_id: string,
subscription: string,
ordering_key?: string,
pull?: {
publish_time?: number,
},
push?: {
headers: Record<string, string>,
publish_time: string,
}
}
},
msg: string,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const decodedString = atob(msg); // Direct decode, assume valid base64
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 401 days ago