{"flow":{"id":65,"summary":"Kafka flow with preprocessor template","versions":[230,236,237,241,268],"created_by":"dieriba.pro916","created_at":"2025-03-03T16:47:04.619Z","votes":0,"approved":true,"apps":["windmill"],"value":{"modules":[],"preprocessor_module":{"id":"preprocessor","value":{"tag":"","type":"rawscript","content":"/**\n * General Trigger Preprocessor\n *\n * ⚠️ This function runs BEFORE the main function.\n *\n * It processes raw trigger data (e.g., MQTT, HTTP, SQS, WebSocket, Kafka) before passing it to main().\n * Common tasks:\n * - Convert binary payloads to string/JSON\n * - Extract metadata\n * - Filter messages\n * - Add timestamps/context\n *\n * The returned object determines main() parameters:\n * - {a: 1, b: 2} → main(a, b)\n * - {msg} → main(msg)\n *\n * @param event - Trigger data (e.g., MQTT, HTTP, SQS, WebSocket, Kafka)\n * @returns Processed data for main()\n */\nexport async function preprocessor(\n  event: {\n    kind: 'kafka',\n    payload: string, // base64 encoded payload\n    brokers: string[],\n    topic: string,\n    group_id: string\n  },\n) {\n  if (event.kind === 'kafka') {\n    try {\n      // Assuming the message received is a JSON value\n      const msg = atob(event.payload);\n      const data = JSON.parse(msg);\n\n      return {\n        msg,\n        data,\n      };\n    } catch (error) {\n      throw new Error(\"Failed to parse Kafka message as JSON\");\n    }\n  }\n  \n  throw new Error(`Expected kafka trigger kind, got: ${event.kind}`);\n}","language":"bun","input_transforms":{"event":{"type":"static"}}}}},"schema":{"type":"object","order":["msg","data"],"$schema":"https://json-schema.org/draft/2020-12/schema","required":["msg","data"],"properties":{"msg":{"type":"string","default":"","description":"Decoded Kafka message payload."},"data":{"type":"object","description":"Parsed JSON data from the Kafka message."}}},"description":"","recording":null,"vcreated_at":"2025-05-08T13:10:45.090Z","vcreated_by":"hugo697","comments":[]}}