0

New Completed Job Run

by
Published 4 days ago

Emits job runs that have reached a terminal state since the last poll, tracked via Windmill state. Optionally restrict to a single job.

Scriptยท trigger databricks Verified

The script

Submitted by hugo989 Typescript (fetch-only)
Verified 5 days ago
1
//native
2

3
import * as wmill from "windmill-client"
4

5
/**
6
 * New Completed Job Run
7
 * Emits job runs that have reached a terminal state since the last poll, tracked via Windmill state. Optionally restrict to a single job.
8
 */
9
export async function main(auth: RT.Databricks, job_id: number | undefined) {
10
  const base = auth.workspace_url.replace(/\/$/, "")
11
  const lastChecked: number = (await wmill.getState()) ?? 0
12

13
  const url = new URL(`${base}/api/2.2/jobs/runs/list`)
14
  url.searchParams.append("completed_only", "true")
15
  url.searchParams.append("limit", "25")
16
  if (job_id !== undefined) url.searchParams.append("job_id", String(job_id))
17

18
  const response = await fetch(url, {
19
    method: "GET",
20
    headers: {
21
      Authorization: `Bearer ${auth.token}`,
22
      Accept: "application/json",
23
    },
24
  })
25

26
  if (!response.ok) {
27
    throw new Error(`${response.status} ${await response.text()}`)
28
  }
29

30
  const data = await response.json()
31
  const runs: Array<{ end_time?: number; start_time?: number }> =
32
    data.runs ?? []
33

34
  const ts = (r: { end_time?: number; start_time?: number }) =>
35
    r.end_time || r.start_time || 0
36
  const maxTs = runs.reduce((m, r) => Math.max(m, ts(r)), 0)
37

38
  // First run: set the watermark to the newest completion seen and don't emit a backlog.
39
  if (!lastChecked) {
40
    await wmill.setState(maxTs)
41
    return []
42
  }
43

44
  const fresh = runs
45
    .filter((r) => ts(r) > lastChecked)
46
    .sort((a, b) => ts(a) - ts(b))
47

48
  if (maxTs > lastChecked) {
49
    await wmill.setState(maxTs)
50
  }
51

52
  return fresh
53
}
54