import requests
import json
import os
import wmill
def main(message):
# Change to the location of your teams webhook secret variable - include the full URL
# Ex: webhook_secret = wmill.get_variable("f/mysecrets/mywebhook")
webhook_secret = wmill.get_variable("")
# Microsoft Teams webhook URL
webhook_url = webhook_secret
# Prepare the adaptive card content for a flow
flow_url = f"{os.environ.get('WM_BASE_URL')}/run/{os.environ.get('WM_ROOT_FLOW_JOB_ID')}?workspace={os.environ.get('WM_WORKSPACE')}"
adaptive_card_content = {
"type": "AdaptiveCard",
"version": "1.2",
"body": [
{
"type": "TextBlock",
"size": "Medium",
"weight": "Bolder",
"text": f"Windmill Flow Failure: {os.environ.get('WM_FLOW_PATH')}"
},
{
"type": "ColumnSet",
"columns": [
{
"type": "Column",
"items": [
{"type": "TextBlock", "text": "Workspace", "weight": "Bolder"},
{"type": "TextBlock", "text": "Flow Path", "weight": "Bolder"},
{"type": "TextBlock", "text": "Executor", "weight": "Bolder"},
{"type": "TextBlock", "text": "Permissioned As", "weight": "Bolder"},
{"type": "TextBlock", "text": "Status", "weight": "Bolder"},
{"type": "TextBlock", "text": "Message", "weight": "Bolder"}
],
"width": "auto"
},
{
"type": "Column",
"items": [
{"type": "TextBlock", "text": os.environ.get('WM_WORKSPACE'), "wrap": True},
{"type": "TextBlock", "text": os.environ.get('WM_FLOW_PATH'), "wrap": True},
{"type": "TextBlock", "text": f"{os.environ.get('WM_USERNAME')} ({os.environ.get('WM_EMAIL')})", "wrap": True},
{"type": "TextBlock", "text": os.environ.get('WM_PERMISSIONED_AS'), "wrap": True},
{"type": "TextBlock", "text": "Failure", "wrap": True, "color": "Attention"},
{"type": "TextBlock", "text": f"Flow Error: {message}", "wrap": True}
],
"width": "stretch"
}
]
},
{
"type": "ActionSet",
"actions": [
{
"type": "Action.OpenUrl",
"title": "View Run",
"url": flow_url
}
]
}
]
}
# Send the POST request to the Microsoft Teams webhook
response = requests.post(
webhook_url,
headers={"Content-Type": "application/vnd.microsoft.card.adaptive"},
data=json.dumps({"type": "message", "attachments": [{"contentType": "application/vnd.microsoft.card.adaptive", "content": adaptive_card_content}]})
)
# Check for success
if response.status_code == 200:
print(f"Failure notification sent to Teams successfully for job: {os.environ.get('WM_ROOT_FLOW_JOB_ID')} {os.environ.get('WM_FLOW_PATH')}")
else:
print(f"Failed to send failure notification to Teams for job ID: {os.environ.get('WM_ROOT_FLOW_JOB_ID')}. Response: {response.status_code} - {response.text}")
Submitted by terrance arroyo955 324 days ago