Edits history of script submission #4552 for ' Sends alert to a slack channel or user on error script or flow error (slack)'

  • python3
    import os
    import requests
    
    
    def main(
        slack_token: str,
        windmill_domain: str,
        channel_or_user_id: str,
        job_id: str,
        path: str,
        is_flow: bool,
        error: dict,
        started_at: str,
        schedule_path: str = None,
    ):
        """
        Send a notification to a Slack channel or user about a job failure in Windmill.
    
        Args:
            slack_token (str): The token to authenticate with Slack.
            windmill_domain (str): The domain of the Windmill instance.
            channel_or_user_id (str): The ID of the Slack channel or user to send the notification to.
            job_id (str): The ID of the job that failed.
            path (str): The path of the job that failed.
            is_flow (bool): Whether the job is a flow or not.
            error (dict): A dictionary containing information about the error. It should have "name" and "message" keys.
            started_at (str): The time the job started at.
            schedule_path (Optional[str], optional): The path of the schedule. Defaults to None.
    
        Returns:
            dict: The response from the Slack API.
        """
        url = "https://slack.com/api/chat.postMessage"
    
        headers = {
            "Authorization": f"Bearer {slack_token}",
            "Content-Type": "application/json",
        }
    
        workspace = os.environ.get("WM_WORKSPACE")
    
        error_name = error["name"]
        error_message = error["message"]
    
        basic_fields = [
            {"type": "mrkdwn", "text": f"*Workspace:*\n{workspace}"},
            {"type": "mrkdwn", "text": f"*Path:*\n{path}"},
            {"type": "mrkdwn", "text": f"*JobId:*\n{job_id}"},
            {"type": "mrkdwn", "text": f"*Flow:*\n{is_flow}"},
            {"type": "mrkdwn", "text": f"*Started At:*\n{started_at}"},
            {
                "type": "mrkdwn",
                "text": f"https://{windmill_domain}/run/{job_id}?workspace={workspace}",
            },
            {"type": "mrkdwn", "text": f"*Error Name:*\n{error_name}"},
            {"type": "mrkdwn", "text": f"*Error Message:*\n{error_message}"},
        ]
    
        if schedule_path:
            basic_fields.append(
                {"type": "mrkdwn", "text": f"*Schedule Path:*\n{schedule_path}"}
            )
    
        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"Windmill Job Failure :rotating_light:",
                    "emoji": True,
                },
            },
            {
                "type": "section",
                "fields": basic_fields,
            },
            {
                "type": "context",
                "elements": [
                    {
                        "type": "plain_text",
                        "text": "Windmill Error Handler Notification",
                        "emoji": True,
                    }
                ],
            },
        ]
    
        payload = {
            "channel": channel_or_user_id,
            "blocks": blocks,
        }
    
        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()
        return response.json()
    

    Submitted by stephan fitzpatrick769 953 days ago