/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param event - Trigger data and details (e.g., MQTT, HTTP)
* @returns Processed data for `main()`
*/
export async function preprocessor(
event: {
payload: string,
kind: 'gcp',
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
ack_id?: string // acknowledgment ID for manual acknowledgment (only for pull delivery)
},
) {
if (event.kind === 'gcp') {
const decodedString = atob(event.payload); // Direct decode, assume valid base64
const attributes = event.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes,
ack_id: event.ack_id
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw base64 encoded payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
* @param ack_id - Acknowledgment ID for manual acknowledgment (only available for pull delivery, undefined for push)
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>,
ack_id?: string
) {
}Submitted by dieriba.pro916 277 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param event - Trigger data and details (e.g., MQTT, HTTP)
* @returns Processed data for `main()`
*/
export async function preprocessor(
event: {
payload: string,
kind: 'gcp',
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
},
) {
if (event.kind === 'gcp') {
const decodedString = atob(event.payload); // Direct decode, assume valid base64
const attributes = event.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw base64 encoded payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by hugo697 369 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param event - Trigger data and details (e.g., MQTT, HTTP)
* @returns Processed data for `main()`
*/
export async function preprocessor(
event: {
payload: string,
kind: 'gcp',
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
},
) {
if (event.kind === 'gcp') {
const decodedString = atob(event.payload); // Direct decode, assume valid base64
const attributes = event.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw base64 encoded payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by hugo697 369 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param event - Trigger data and details (e.g., MQTT, HTTP)
* @returns Processed data for `main()`
*/
export async function preprocessor(
event: {
payload: string,
kind: 'gcp',
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
},
) {
if (event.kind === 'gcp') {
const decodedString = atob(event.payload); // Direct decode, assume valid base64
const attributes = event.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by hugo697 369 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param event - Trigger data and details (e.g., MQTT, HTTP)
* @returns Processed data for `main()`
*/
export async function preprocessor(
event: {
payload: string,
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
},
) {
if (event.kind === 'gcp') {
const decodedString = atob(event.payload); // Direct decode, assume valid base64
const attributes = event.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${event.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by hugo697 369 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
}
},
payload: string,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const decodedString = atob(payload); // Direct decode, assume valid base64
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 397 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
message_id: string,
subscription: string,
ordering_key?: string,
attributes?: Record<string, string>,
delivery_type: "push" | "pull",
headers?: Record<string, string>,
publish_time?: string,
}
},
msg: string,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const decodedString = atob(msg); // Direct decode, assume valid base64
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 399 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
attributes?: Record<string, string>,
message_id: string,
subscription: string,
ordering_key?: string,
pull?: {
publish_time?: number,
},
push?: {
headers: Record<string, string>,
publish_time: string,
}
}
},
msg: string,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const decodedString = atob(msg); // Direct decode, assume valid base64
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedMessage: any = decodedString;
if (isJson) {
try {
parsedMessage = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
messageAsDecodedString: decodedString,
contentType,
parsedMessage,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedMessage - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
messageAsDecodedString: string,
contentType?: string,
parsedMessage?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 401 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
attributes?: Record<string, string>,
message_id: string,
subscription: string,
ordering_key?: string,
pull?: {
publish_time?: number,
},
push?: {
headers: Record<string, string>,
publish_time: string,
}
}
},
payload: string,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const decodedString = atob(payload); // Direct decode, assume valid base64
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedPayload: any = decodedString;
if (isJson) {
try {
parsedPayload = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
payloadAsDecodedString: decodedString,
contentType,
parsedPayload,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
payloadAsDecodedString: string,
contentType?: string,
parsedPayload?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 401 days ago
/**
* General Trigger Preprocessor
*
* Assumes payload is valid standard base64 (Pub/Sub guarantee).
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
attributes?: Record<string, string>,
message_id: string,
subscription: string,
ordering_key?: string,
pull?: {
publish_time?: number,
},
push?: {
headers: Record<string, string>,
publish_time: string,
}
}
},
payload: string, // base64 encoded string
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const decodedString = atob(payload); // Direct decode, assume valid base64
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedPayload: any = decodedString;
if (isJson) {
try {
parsedPayload = JSON.parse(decodedString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
payloadAsString: decodedString,
contentType,
parsedPayload,
attributes
};
}
throw new Error(`Expected gcp trigger kind, got: ${wm_trigger.kind}`);
}Submitted by dieriba.pro916 401 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
attributes?: Record<string, string>,
message_id: string,
subscription: string,
ordering_key?: string,
pull?: {
publish_time?: number,
},
push?: {
headers: Record<string, string>,
publish_time: string,
}
}
},
payload: Array<number>,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const uint8Payload = new Uint8Array(payload);
const payloadAsString = new TextDecoder().decode(uint8Payload);
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedPayload: any = payloadAsString;
if (isJson) {
try {
parsedPayload = JSON.parse(payloadAsString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
payload: uint8Payload,
payloadAsString,
contentType,
parsedPayload,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
payload: Uint8Array,
payloadAsString: string,
contentType?: string,
parsedPayload?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 404 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
attributes?: Record<string, string>,
message_id: string,
pull?: {
publish_time?: number,
ordering_key?: string,
},
push?: {
headers: Record<string, string>,
publish_time: string,
subscription: string
}
}
},
payload: Array<number>,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const uint8Payload = new Uint8Array(payload);
const payloadAsString = new TextDecoder().decode(uint8Payload);
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedPayload: any = payloadAsString;
if (isJson) {
try {
parsedPayload = JSON.parse(payloadAsString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
payload: uint8Payload,
payloadAsString,
contentType,
parsedPayload,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
payload: Uint8Array,
payloadAsString: string,
contentType?: string,
parsedPayload?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 404 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{payload}` → `main(payload)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP)
* @param payload - Raw trigger data (format varies by trigger type)
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp',
gcp?: {
attributes?: Record<string, string>,
message_id: string,
publish_time?: number,
ordering_key?: string,
push?: {
headers: Record<string, string>,
publish_time: string
}
}
},
payload: Array<number>,
) {
if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) {
const uint8Payload = new Uint8Array(payload);
const payloadAsString = new TextDecoder().decode(uint8Payload);
const attributes = wm_trigger.gcp.attributes || {};
const contentType = attributes['content-type'] || attributes['Content-Type'];
const isJson = contentType === 'application/json';
let parsedPayload: any = payloadAsString;
if (isJson) {
try {
parsedPayload = JSON.parse(payloadAsString);
} catch (err) {
throw new Error(`Invalid JSON payload: ${err}`);
}
}
return {
payload: uint8Payload,
payloadAsString,
contentType,
parsedPayload,
attributes
};
}
// We assume the script is triggered by a GCP message, which is why an error is thrown for other trigger kinds.
// If the script is intended to support other triggers, update this logic to handle the respective trigger kind.
throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`)
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param payload - Raw binary payload
* @param payloadAsString - Decoded string payload
* @param contentType - Content type from message attributes (e.g., application/json)
* @param parsedPayload - Parsed object if payload is JSON, otherwise same as payloadAsString
* @param attributes - Key-value metadata sent with the message
*/
export async function main(
payload: Uint8Array,
payloadAsString: string,
contentType?: string,
parsedPayload?: any,
attributes?: Record<string, string>
) {
}Submitted by dieriba.pro916 404 days ago