import os
import requests
def main(
    slack_token: str,
    windmill_domain: str,
    channel_or_user_id: str,
    job_id: str,
    path: str,
    is_flow: bool,
    error: dict,
    started_at: str,
    schedule_path: str = None,
):
    """
    Send a notification to a Slack channel or user about a job failure in Windmill.
    Args:
        slack_token (str): The token to authenticate with Slack.
        windmill_domain (str): The domain of the Windmill instance.
        channel_or_user_id (str): The ID of the Slack channel or user to send the notification to.
        job_id (str): The ID of the job that failed.
        path (str): The path of the job that failed.
        is_flow (bool): Whether the job is a flow or not.
        error (dict): A dictionary containing information about the error. It should have "name" and "message" keys.
        started_at (str): The time the job started at.
        schedule_path (Optional[str], optional): The path of the schedule. Defaults to None.
    Returns:
        dict: The response from the Slack API.
    """
    url = "https://slack.com/api/chat.postMessage"
    headers = {
        "Authorization": f"Bearer {slack_token}",
        "Content-Type": "application/json",
    }
    workspace = os.environ.get("WM_WORKSPACE")
    error_name = error["name"]
    error_message = error["message"]
    basic_fields = [
        {"type": "mrkdwn", "text": f"*Workspace:*\n{workspace}"},
        {"type": "mrkdwn", "text": f"*Path:*\n{path}"},
        {"type": "mrkdwn", "text": f"*JobId:*\n{job_id}"},
        {"type": "mrkdwn", "text": f"*Flow:*\n{is_flow}"},
        {"type": "mrkdwn", "text": f"*Started At:*\n{started_at}"},
        {
            "type": "mrkdwn",
            "text": f"https://{windmill_domain}/run/{job_id}?workspace={workspace}",
        },
        {"type": "mrkdwn", "text": f"*Error Name:*\n{error_name}"},
        {"type": "mrkdwn", "text": f"*Error Message:*\n{error_message}"},
    ]
    if schedule_path:
        basic_fields.append(
            {"type": "mrkdwn", "text": f"*Schedule Path:*\n{schedule_path}"}
        )
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"Windmill Job Failure :rotating_light:",
                "emoji": True,
            },
        },
        {
            "type": "section",
            "fields": basic_fields,
        },
        {
            "type": "context",
            "elements": [
                {
                    "type": "plain_text",
                    "text": "Windmill Error Handler Notification",
                    "emoji": True,
                }
            ],
        },
    ]
    payload = {
        "channel": channel_or_user_id,
        "blocks": blocks,
    }
    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()
    return response.json()
Submitted by stephan fitzpatrick769 711 days ago