1 | import requests |
2 | import json |
3 | import os |
4 | import wmill |
5 |
|
6 | def main(message): |
7 | # Change to the location of your teams webhook secret variable - include the full URL |
8 | # Ex: webhook_secret = wmill.get_variable("f/mysecrets/mywebhook") |
9 | webhook_secret = wmill.get_variable("") |
10 | |
11 | # Microsoft Teams webhook URL |
12 | webhook_url = webhook_secret |
13 |
|
14 | # Prepare the adaptive card content for a flow |
15 | flow_url = f"{os.environ.get('WM_BASE_URL')}/run/{os.environ.get('WM_ROOT_FLOW_JOB_ID')}?workspace={os.environ.get('WM_WORKSPACE')}" |
16 | adaptive_card_content = { |
17 | "type": "AdaptiveCard", |
18 | "version": "1.2", |
19 | "body": [ |
20 | { |
21 | "type": "TextBlock", |
22 | "size": "Medium", |
23 | "weight": "Bolder", |
24 | "text": f"Windmill Flow Failure: {os.environ.get('WM_FLOW_PATH')}" |
25 | }, |
26 | { |
27 | "type": "ColumnSet", |
28 | "columns": [ |
29 | { |
30 | "type": "Column", |
31 | "items": [ |
32 | {"type": "TextBlock", "text": "Workspace", "weight": "Bolder"}, |
33 | {"type": "TextBlock", "text": "Flow Path", "weight": "Bolder"}, |
34 | {"type": "TextBlock", "text": "Executor", "weight": "Bolder"}, |
35 | {"type": "TextBlock", "text": "Permissioned As", "weight": "Bolder"}, |
36 | {"type": "TextBlock", "text": "Status", "weight": "Bolder"}, |
37 | {"type": "TextBlock", "text": "Message", "weight": "Bolder"} |
38 | ], |
39 | "width": "auto" |
40 | }, |
41 | { |
42 | "type": "Column", |
43 | "items": [ |
44 | {"type": "TextBlock", "text": os.environ.get('WM_WORKSPACE'), "wrap": True}, |
45 | {"type": "TextBlock", "text": os.environ.get('WM_FLOW_PATH'), "wrap": True}, |
46 | {"type": "TextBlock", "text": f"{os.environ.get('WM_USERNAME')} ({os.environ.get('WM_EMAIL')})", "wrap": True}, |
47 | {"type": "TextBlock", "text": os.environ.get('WM_PERMISSIONED_AS'), "wrap": True}, |
48 | {"type": "TextBlock", "text": "Failure", "wrap": True, "color": "Attention"}, |
49 | {"type": "TextBlock", "text": f"Flow Error: {message}", "wrap": True} |
50 | ], |
51 | "width": "stretch" |
52 | } |
53 | ] |
54 | }, |
55 | { |
56 | "type": "ActionSet", |
57 | "actions": [ |
58 | { |
59 | "type": "Action.OpenUrl", |
60 | "title": "View Run", |
61 | "url": flow_url |
62 | } |
63 | ] |
64 | } |
65 | ] |
66 | } |
67 |
|
68 | # Send the POST request to the Microsoft Teams webhook |
69 | response = requests.post( |
70 | webhook_url, |
71 | headers={"Content-Type": "application/vnd.microsoft.card.adaptive"}, |
72 | data=json.dumps({"type": "message", "attachments": [{"contentType": "application/vnd.microsoft.card.adaptive", "content": adaptive_card_content}]}) |
73 | ) |
74 |
|
75 | # Check for success |
76 | if response.status_code == 200: |
77 | print(f"Failure notification sent to Teams successfully for job: {os.environ.get('WM_ROOT_FLOW_JOB_ID')} {os.environ.get('WM_FLOW_PATH')}") |
78 | else: |
79 | print(f"Failed to send failure notification to Teams for job ID: {os.environ.get('WM_ROOT_FLOW_JOB_ID')}. Response: {response.status_code} - {response.text}") |
80 |
|