0

Sends alert to a slack channel or user on error script or flow error

by
Published Nov 13, 2023

This script will send a message to a slack channel or user of your choosing in the event of a job failure. It's written in Python and uses the requests library. It utilizes slack blocks for rich text and styling.

Scriptยท failure slack
  • Submitted by stephan fitzpatrick769 Python3
    Created 953 days ago
    1
    import os
    2
    import requests
    3
    
    
    4
    
    
    5
    def main(
    6
        slack_token: str,
    7
        windmill_domain: str,
    8
        channel_or_user_id: str,
    9
        job_id: str,
    10
        path: str,
    11
        is_flow: bool,
    12
        error: dict,
    13
        started_at: str,
    14
        schedule_path: str = None,
    15
    ):
    16
        """
    17
        Send a notification to a Slack channel or user about a job failure in Windmill.
    18
    
    
    19
        Args:
    20
            slack_token (str): The token to authenticate with Slack.
    21
            windmill_domain (str): The domain of the Windmill instance.
    22
            channel_or_user_id (str): The ID of the Slack channel or user to send the notification to.
    23
            job_id (str): The ID of the job that failed.
    24
            path (str): The path of the job that failed.
    25
            is_flow (bool): Whether the job is a flow or not.
    26
            error (dict): A dictionary containing information about the error. It should have "name" and "message" keys.
    27
            started_at (str): The time the job started at.
    28
            schedule_path (Optional[str], optional): The path of the schedule. Defaults to None.
    29
    
    
    30
        Returns:
    31
            dict: The response from the Slack API.
    32
        """
    33
        url = "https://slack.com/api/chat.postMessage"
    34
    
    
    35
        headers = {
    36
            "Authorization": f"Bearer {slack_token}",
    37
            "Content-Type": "application/json",
    38
        }
    39
    
    
    40
        workspace = os.environ.get("WM_WORKSPACE")
    41
    
    
    42
        error_name = error["name"]
    43
        error_message = error["message"]
    44
    
    
    45
        basic_fields = [
    46
            {"type": "mrkdwn", "text": f"*Workspace:*\n{workspace}"},
    47
            {"type": "mrkdwn", "text": f"*Path:*\n{path}"},
    48
            {"type": "mrkdwn", "text": f"*JobId:*\n{job_id}"},
    49
            {"type": "mrkdwn", "text": f"*Flow:*\n{is_flow}"},
    50
            {"type": "mrkdwn", "text": f"*Started At:*\n{started_at}"},
    51
            {
    52
                "type": "mrkdwn",
    53
                "text": f"https://{windmill_domain}/run/{job_id}?workspace={workspace}",
    54
            },
    55
            {"type": "mrkdwn", "text": f"*Error Name:*\n{error_name}"},
    56
            {"type": "mrkdwn", "text": f"*Error Message:*\n{error_message}"},
    57
        ]
    58
    
    
    59
        if schedule_path:
    60
            basic_fields.append(
    61
                {"type": "mrkdwn", "text": f"*Schedule Path:*\n{schedule_path}"}
    62
            )
    63
    
    
    64
        blocks = [
    65
            {
    66
                "type": "header",
    67
                "text": {
    68
                    "type": "plain_text",
    69
                    "text": f"Windmill Job Failure :rotating_light:",
    70
                    "emoji": True,
    71
                },
    72
            },
    73
            {
    74
                "type": "section",
    75
                "fields": basic_fields,
    76
            },
    77
            {
    78
                "type": "context",
    79
                "elements": [
    80
                    {
    81
                        "type": "plain_text",
    82
                        "text": "Windmill Error Handler Notification",
    83
                        "emoji": True,
    84
                    }
    85
                ],
    86
            },
    87
        ]
    88
    
    
    89
        payload = {
    90
            "channel": channel_or_user_id,
    91
            "blocks": blocks,
    92
        }
    93
    
    
    94
        response = requests.post(url, json=payload, headers=headers)
    95
        response.raise_for_status()
    96
        return response.json()
    97