/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS, Kafka) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param event - Trigger data (e.g., MQTT, HTTP, SQS, Kafka)
* @returns Processed data for main()
*/
export async function preprocessor(
event: {
kind: 'kafka',
payload: string, // base64 encoded payload
brokers: string[],
topic: string,
group_id: string
},
) {
if (event.kind === 'kafka') {
try {
// Assuming the message received is a JSON value
const msg = atob(event.payload);
const data = JSON.parse(msg);
return {
msg,
data,
};
} catch (error) {
throw new Error("Failed to parse Kafka message as JSON");
}
}
throw new Error(`Expected kafka trigger kind, got: ${event.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Decoded message content
* @param data - Parsed JSON data from the Kafka message
*/
export async function main(
msg: string,
data: any,
) {
// Implement the main function logic here
}
Submitted by hugo697 369 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS, Kafka) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS, Kafka)
* @param msg - Message received from Kafka
* @returns Processed data for main()
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
kafka?: {
brokers: string[],
topic: string,
group_id: string
}
},
msg: string
) {
if (wm_trigger.kind === 'kafka' && wm_trigger.kafka) {
let data;
try {
// Assuming the message received is a JSON value
data = JSON.parse(msg);
} catch (error) {
throw new Error("Failed to parse Kafka message as JSON");
}
return {
msg,
data,
kafka: wm_trigger.kafka
};
}
throw new Error(`Expected kafka trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param data - Parsed JSON data from the Kafka message
* @param kafka - Kafka metadata object
*/
export async function main(
msg: string,
data: any,
kafka: {
brokers: string[],
topic: string,
group_id: string
}
) {
// Implement the main function logic here
}
Submitted by dieriba.pro916 435 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS, Kafka) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS, Kafka)
* @param msg - Raw trigger data (format varies by trigger type)
* @returns Processed data for main()
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
kafka?: {
brokers: string[],
topic: string,
group_id: string
}
},
msg: string
) {
if (wm_trigger.kind === 'kafka' && wm_trigger.kafka) {
let data;
try {
// Assuming the message received is a JSON value
data = JSON.parse(msg);
} catch (error) {
throw new Error("Failed to parse Kafka message as JSON");
}
return {
msg,
data,
kafka: wm_trigger.kafka
};
}
throw new Error(`Expected kafka trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param data - Parsed JSON data from the Kafka message
* @param kafka - Kafka metadata object
*/
export async function main(
msg: string,
data: any,
kafka: {
brokers: string[],
topic: string,
group_id: string
}
) {
// Implement the main function logic here
}
Submitted by dieriba.pro916 435 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS, WebSocket, Kafka) before passing it to `main()`.
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines `main()` parameters:
* - `{a: 1, b: 2}` → `main(a, b)`
* - `{msg}` → `main(msg)`
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS, WebSocket, Kafka)
* @param message - Raw trigger data from Kafka
* @returns Processed data for `main()`
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs',
kafka?: {
brokers: string[];
topic: string;
group_id: string;
};
},
message: string
) {
if (wm_trigger.kind === 'kafka' && wm_trigger.kafka) {
return {
brokers: wm_trigger.kafka.brokers,
topic: wm_trigger.kafka.topic,
groupId: wm_trigger.kafka.group_id,
message
};
}
throw new Error(`Expected kafka trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER `preprocessor()`, with its return values.
*
* @param brokers - The Kafka brokers
* @param topic - The Kafka topic
* @param groupId - The Kafka consumer group ID
* @param message - The Kafka message received
*/
export async function main(brokers: string[], topic: string, groupId: string, message: string) {
// Implement the main function logic here
}
Submitted by dieriba.pro916 435 days ago