Task Run Trigger (for polling)

Retrieves the latest runs of a selected Apify Task and returns their details if they are new runs (compared to last poll) and has completed with one of the specified terminal statuses. Intended for use with Windmill's native polling trigger, which handles the periodic execution and state management. First run always returns an empty array. An empty array is also returned after "Task ID" and/or "Event Types" are changed.

Script· trigger apify

by jakub.drobnik222 · 12/5/2025

  • Submitted by jakub.drobnik222 Bun
    Created 172 days ago
    1
    import * as wmill from 'windmill-client';
    2
    
    
    3
    import { ApifyClient } from 'apify-client@^2.19.0';
    4
    
    
    5
    type ApifyApiKey = {
    6
      api_key: string;
    7
    };
    8
    
    
    9
    type Apify = {
    10
      token: string;
    11
    };
    12
    
    
    13
    type EventType = 'SUCCEEDED' | 'FAILED' | 'TIMED-OUT' | 'ABORTED';
    14
    
    
    15
    export type DynSelect_taskId = string;
    16
    export async function taskId(api_key?: ApifyApiKey, oauth_token?: Apify) {
    17
      if (!api_key?.api_key && !oauth_token?.token) {
    18
        return [{ value: '', label: 'Missing Apify API key or OAuth token' }];
    19
      }
    20
    
    
    21
      try {
    22
        const client = createClient(api_key, oauth_token);
    23
    
    
    24
        const data = await client.tasks().list();
    25
        const items = data?.items ?? [];
    26
    
    
    27
        return items.map((task: any) => ({
    28
          value: task.id,
    29
          label: task.title || task.name || task.id,
    30
        }));
    31
      } catch (error: any) {
    32
        return [
    33
          { value: '', label: `Failed to load tasks: ${error.message || error}` },
    34
        ];
    35
      }
    36
    }
    37
    
    
    38
    const createClient = (api_key?: ApifyApiKey, oauth_token?: Apify): ApifyClient => {
    39
      const token = oauth_token?.token ?? api_key?.api_key;
    40
      if (!token) {
    41
        throw new Error('Missing Apify API key or OAuth token');
    42
      }
    43
    
    
    44
      return new ApifyClient({
    45
        token: token,
    46
        requestInterceptors: [
    47
          (request) => {
    48
            if (!request.headers) {
    49
              request.headers = {};
    50
            }
    51
            request.headers['x-apify-integration-platform'] = 'windmill';
    52
            return request;
    53
          },
    54
        ],
    55
      });
    56
    };
    57
    
    
    58
    function areEventTypesEqual(a?: EventType[], b?: EventType[]): boolean {
    59
      if (!a || !b) return false;
    60
      return JSON.stringify([...a].sort()) === JSON.stringify([...b].sort());
    61
    }
    62
    
    
    63
    type TriggerState = {
    64
      lastRunId?: string;
    65
      taskId?: string;
    66
      eventTypes?: EventType[];
    67
    };
    68
    
    
    69
    export async function main(
    70
      taskId: DynSelect_taskId,
    71
      eventTypes: EventType[],
    72
      runsLimit: number,
    73
      api_key?: ApifyApiKey,
    74
      oauth_token?: Apify,
    75
    ) {
    76
      // Validate event types
    77
      if (!eventTypes.length) {
    78
        return { error: 'At least one of the Event Types has to be picked.' };
    79
      }
    80
    
    
    81
      // Validate runsLimit to be a positive integer and cap at 1000
    82
      const validatedRunsLimit = Number.isFinite(Number(runsLimit))
    83
        ? Math.min(1000, Math.floor(Number(runsLimit)))
    84
        : NaN;
    85
    
    
    86
      if (!validatedRunsLimit || validatedRunsLimit < 1) {
    87
        return { error: 'runsLimit must be a positive integer.' };
    88
      }
    89
    
    
    90
      try {
    91
        const client = createClient(api_key, oauth_token);
    92
    
    
    93
        // Fetch runs using the provided runsLimit (can return multiple)
    94
        const { items } = await client.task(taskId).runs().list({
    95
          limit: validatedRunsLimit,
    96
          offset: 0,
    97
          desc: true, // newest first
    98
          status: eventTypes,
    99
        });
    100
    
    
    101
        // Fetched runs array
    102
        const runs: any[] = items ?? [];
    103
    
    
    104
        // If nothing returned, skip
    105
        if (runs.length === 0) {
    106
          return [];
    107
        }
    108
    
    
    109
        // Get persisted state
    110
        const state = (await wmill.getState()) as TriggerState | undefined;
    111
        const newestRunId: string = runs[0]?.id;
    112
    
    
    113
        // First-ever run: only persist and return empty array
    114
        if (!state?.lastRunId) {
    115
          await wmill.setState({
    116
            lastRunId: newestRunId,
    117
            taskId: taskId,
    118
            eventTypes,
    119
          });
    120
          return [];
    121
        }
    122
    
    
    123
        // Check if task or event types changed since last time
    124
        const taskChanged = state.taskId !== taskId;
    125
        const eventTypesChanged = !areEventTypesEqual(state.eventTypes, eventTypes);
    126
    
    
    127
        if (taskChanged || eventTypesChanged) {
    128
          // Reset lastRunId to the newest and persist new metadata; return nothing
    129
          await wmill.setState({
    130
            lastRunId: newestRunId,
    131
            taskId: taskId,
    132
            eventTypes,
    133
          });
    134
          return [];
    135
        }
    136
    
    
    137
        // Find index of the previously processed run
    138
        const lastRunId = state.lastRunId;
    139
        const foundIdx = runs.findIndex((run) => run?.id === lastRunId);
    140
    
    
    141
        // If found, return only the newer runs to the left; otherwise return all
    142
        const newerRuns = foundIdx >= 0 ? runs.splice(0, foundIdx) : runs;
    143
    
    
    144
        // Persist the newest run ID + metadata for next trigger
    145
        await wmill.setState({
    146
          lastRunId: newestRunId,
    147
          taskId: taskId,
    148
          eventTypes,
    149
        });
    150
    
    
    151
        // Return array of newer runs (which can be empty if foundIdx is 0)
    152
        return newerRuns;
    153
      } catch (error: any) {
    154
        return {
    155
          error: `Failed to process task trigger. Reason: ${
    156
            error?.message || error
    157
          }`,
    158
        };
    159
      }
    160
    }
    161