/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param event - Trigger data (e.g., MQTT, HTTP, SQS)
* @returns Processed data for main()
*/
export async function preprocessor(
event: {
kind: 'sqs',
msg: string,
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
},
) {
if (event.kind === 'sqs') {
try {
// We assume the message is a JSON value
const data = JSON.parse(event.msg);
return {
msg: event.msg,
data,
message_attributes: event.message_attributes
};
} catch (error) {
throw new Error("Failed to parse SQS message as JSON");
}
}
throw new Error(`Expected SQS trigger kind, got: ${event.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param data - Parsed JSON data
* @param message_attributes - User-defined attributes
*/
export async function main(
msg: string,
data: any,
message_attributes?: Record<string, { string_value?: string; data_type: string }>
) {
// Implement the main function logic here
}
Submitted by hugo697 369 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param event - Trigger data (e.g., MQTT, HTTP, SQS)
* @returns Processed data for main()
*/
export async function preprocessor(
event: {
kind: 'sqs',
msg: string,
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
},
) {
if (event.kind === 'sqs') {
let data;
try {
// We assume the message is a JSON value
data = JSON.parse(event.msg);
} catch (error) {
throw new Error("Failed to parse SQS message as JSON");
}
return {
msg: event.msg,
data,
message_attributes: event.message_attributes
};
}
throw new Error(`Expected sqs trigger kind, got: ${event.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param data - Parsed JSON data
* @param message_attributes - User-defined attributes
*/
export async function main(
msg: string,
data: any,
message_attributes?: Record<string, { string_value?: string; data_type: string }>
) {
// Implement the main function logic here
}
Submitted by hugo697 369 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS)
* @param msg - Message received from SQS
* @returns Processed data for main()
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
msg: string
) {
if (wm_trigger.kind === 'sqs' && wm_trigger.sqs) {
let data;
try {
// We assume the message is a JSON value
data = JSON.parse(msg);
} catch (error) {
throw new Error("Failed to parse SQS message as JSON");
}
return {
msg,
data,
sqs: wm_trigger.sqs
};
}
throw new Error(`Expected sqs trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param data - Parsed JSON data
* @param sqs - SQS metadata object
*/
export async function main(
msg: string,
data: any,
sqs: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, { string_value?: string; data_type: string }>
}
) {
// Implement the main function logic here
}
Submitted by dieriba.pro916 435 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS)
* @param msg - Raw trigger data (format varies by trigger type)
* @returns Processed data for main()
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
msg: string
) {
if (wm_trigger.kind === 'sqs' && wm_trigger.sqs) {
let data;
try {
// We assume the message is a JSON value
data = JSON.parse(msg);
} catch (error) {
throw new Error("Failed to parse SQS message as JSON");
}
return {
msg,
data,
sqs: wm_trigger.sqs
};
}
throw new Error(`Expected sqs trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param data - Parsed JSON data
* @param sqs - SQS metadata object
*/
export async function main(
msg: string,
data: any,
sqs: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, { string_value?: string; data_type: string }>
}
) {
// Implement the main function logic here
}
Submitted by dieriba.pro916 435 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS)
* @param msg - Raw trigger data (format varies by trigger type)
* @returns Processed data for main()
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
msg: string
) {
if (wm_trigger.kind === 'sqs' && wm_trigger.sqs) {
let userData;
try {
userData = JSON.parse(msg);
} catch (error) {
throw new Error("Failed to parse SQS message as JSON");
}
return {
msg,
userData,
sqs: wm_trigger.sqs
};
}
throw new Error(`Expected sqs trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param userData - Parsed JSON user data
* @param sqs - SQS metadata object
*/
export async function main(
msg: string,
userData: any,
sqs: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, { string_value?: string; data_type: string }>
}
) {
// Implement the main function logic here
}
Submitted by dieriba.pro916 435 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS)
* @param msg - Raw trigger data (format varies by trigger type)
* @returns Processed data for main()
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
msg: string
) {
if (wm_trigger.kind === 'sqs' && wm_trigger.sqs) {
return {
msg,
sqs: wm_trigger.sqs
};
}
throw new Error(`Expected sqs trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param msg - Raw message content
* @param sqs - SQS metadata object
*/
export async function main(
msg: string,
sqs: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, { string_value?: string; data_type: string }>
}
) {
// Implement the main function logic here
}Submitted by dieriba.pro916 435 days ago
/**
* General Trigger Preprocessor
*
* ⚠️ This function runs BEFORE the main function.
*
* It processes raw trigger data (e.g., MQTT, HTTP, SQS) before passing it to main().
* Common tasks:
* - Convert binary payloads to string/JSON
* - Extract metadata
* - Filter messages
* - Add timestamps/context
*
* The returned object determines main() parameters:
* - {a: 1, b: 2} → main(a, b)
* - {msg} → main(msg)
*
* @param wm_trigger - Trigger details (e.g., MQTT, HTTP, SQS)
* @param msg - Raw trigger data (format varies by trigger type)
* @returns Processed data for main()
*/
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
msg: string
) {
if (wm_trigger.kind === 'sqs' && wm_trigger.sqs) {
return {
queueUrl: wm_trigger.sqs.queue_url,
messageId: wm_trigger.sqs.message_id,
receiptHandle: wm_trigger.sqs.receipt_handle,
attributes: wm_trigger.sqs.attributes,
messageAttributes: wm_trigger.sqs.message_attributes,
msg
};
}
throw new Error(`Expected sqs trigger kind, got: ${wm_trigger.kind}`);
}
/**
* Main Function - Handles processed trigger events
*
* ⚠️ Called AFTER preprocessor(), with its return values.
*
* @param queueUrl - URL of the SQS queue
* @param messageId - Unique message identifier (if available)
* @param receiptHandle - Receipt handle for message deletion (if available)
* @param attributes - Message attributes from SQS
* @param messageAttributes - Custom message attributes (if any)
* @param msg - Raw message content
*/
export async function main(
queueUrl: string,
messageId: string | undefined,
receiptHandle: string | undefined,
attributes: Record<string, string>,
messageAttributes: Record<string, { string_value?: string; data_type: string }> | undefined,
msg: string
) {
// Implement the main function logic here
}
Submitted by dieriba.pro916 435 days ago
// import * as wmill from 'windmill-client'
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs',
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
/* your other args */
) {
return {
// return the args to be passed to the runnable
}
}
export async function main(
/* main function args */
) {
// Implement the main function logic here
}Submitted by dieriba.pro916 455 days ago
// import * as wmill from 'windmill-client'
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs',
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
/* your other args */
) {
return {
// return the args to be passed to the runnable
}
}
export async function main(
/* main function args */
) {
// Implement the main function logic here
}Submitted by dieriba.pro916 455 days ago