Actor Run Trigger (for polling)

Retrieves the latest runs of a selected Apify Actor and returns their details if they are new runs (compared to last poll) and has completed with one of the specified terminal statuses. Intended for use with Windmill's native polling trigger, which handles the periodic execution and state management. First run always returns an empty array. An empty array is also returned after "Actor ID" and/or "Event Types" are changed.

Script· trigger apify

by jakub.drobnik222 · 12/5/2025

  • Submitted by jakub.drobnik222 Bun
    Created 172 days ago
    1
    import * as wmill from 'windmill-client';
    2
    
    
    3
    import { ActorListSortBy, ApifyClient } from 'apify-client@^2.19.0';
    4
    
    
    5
    type ApifyApiKey = {
    6
      api_key: string;
    7
    };
    8
    
    
    9
    type Apify = {
    10
      token: string;
    11
    };
    12
    
    
    13
    type EventType = 'SUCCEEDED' | 'FAILED' | 'TIMED-OUT' | 'ABORTED';
    14
    
    
    15
    type ActorSource = 'RECENTLY_USED_ACTORS' | 'APIFY_STORE_ACTORS';
    16
    
    
    17
    export type DynSelect_actorId = string;
    18
    export async function actorId(actorSource: ActorSource, api_key?: ApifyApiKey, oauth_token?: Apify) {
    19
      if (!api_key?.api_key && !oauth_token?.token) {
    20
        return [{ value: '', label: 'Missing Apify API key or OAuth token' }];
    21
      }
    22
    
    
    23
      const client = createClient(api_key, oauth_token);
    24
    
    
    25
      const mapActorToSelectOption = (actor: any) => {
    26
        const optionName = actor.title
    27
          ? `${actor.title} (${actor.username}/${actor.name})`
    28
          : `${actor.username}/${actor.name}`;
    29
    
    
    30
        return {
    31
          label: optionName,
    32
          value: actor.id,
    33
        };
    34
      };
    35
    
    
    36
      try {
    37
        if (actorSource === 'RECENTLY_USED_ACTORS') {
    38
          const recentActors = await client.actors().list({
    39
            limit: 100,
    40
            offset: 0,
    41
            sortBy: ActorListSortBy.LAST_RUN_STARTED_AT,
    42
            desc: true,
    43
          });
    44
          return recentActors.items.map(mapActorToSelectOption);
    45
        }
    46
    
    
    47
        const storeActors = await client.store().list({
    48
          limit: 1000,
    49
          offset: 0,
    50
        });
    51
        return storeActors.items.map(mapActorToSelectOption);
    52
      } catch (error: any) {
    53
        return [
    54
          { value: '', label: `Failed to load actors: ${error.message || error}` },
    55
        ];
    56
      }
    57
    }
    58
    
    
    59
    const createClient = (api_key?: ApifyApiKey, oauth_token?: Apify): ApifyClient => {
    60
      const token = oauth_token?.token ?? api_key?.api_key;
    61
      if (!token) {
    62
        throw new Error('Missing Apify API key or OAuth token');
    63
      }
    64
    
    
    65
      return new ApifyClient({
    66
        token: token,
    67
        requestInterceptors: [
    68
          (request) => {
    69
            if (!request.headers) {
    70
              request.headers = {};
    71
            }
    72
            request.headers['x-apify-integration-platform'] = 'windmill';
    73
            return request;
    74
          },
    75
        ],
    76
      });
    77
    };
    78
    
    
    79
    function areEventTypesEqual(a?: EventType[], b?: EventType[]): boolean {
    80
      if (!a || !b) return false;
    81
      return JSON.stringify([...a].sort()) === JSON.stringify([...b].sort());
    82
    }
    83
    // SYNC_BLOCK_END
    84
    
    
    85
    type TriggerState = {
    86
      lastRunId?: string;
    87
      actorId?: string;
    88
      eventTypes?: EventType[];
    89
    };
    90
    
    
    91
    export async function main(
    92
      actorSource: ActorSource,
    93
      actorId: DynSelect_actorId,
    94
      eventTypes: EventType[],
    95
      runsLimit: number,
    96
      api_key?: ApifyApiKey,
    97
      oauth_token?: Apify,
    98
    ) {
    99
      if (!eventTypes.length) {
    100
        return { error: 'At least one of the Event Types has to be picked.' };
    101
      }
    102
    
    
    103
      // Validate runsLimit to be a positive integer and cap at 1000
    104
      const validatedRunsLimit = Number.isFinite(Number(runsLimit))
    105
        ? Math.min(1000, Math.floor(Number(runsLimit)))
    106
        : NaN;
    107
    
    
    108
      if (!validatedRunsLimit || validatedRunsLimit < 1) {
    109
        return { error: 'runsLimit must be a positive integer.' };
    110
      }
    111
    
    
    112
      try {
    113
        const client = createClient(api_key, oauth_token);
    114
    
    
    115
        const { items } = await client.actor(actorId).runs().list({
    116
          limit: validatedRunsLimit,
    117
          offset: 0,
    118
          desc: true,
    119
          status: eventTypes,
    120
        });
    121
    
    
    122
        const runs: any[] = items ?? [];
    123
    
    
    124
        // If nothing returned, skip
    125
        if (runs.length === 0) {
    126
          return [];
    127
        }
    128
    
    
    129
        // Get persisted state
    130
        const state = (await wmill.getState()) as TriggerState | undefined;
    131
        const newestRunId: string = runs[0]?.id;
    132
    
    
    133
        // First-ever run: only persist and return empty array
    134
        if (!state?.lastRunId) {
    135
          await wmill.setState({
    136
            lastRunId: newestRunId,
    137
            actorId: actorId,
    138
            eventTypes: eventTypes,
    139
          });
    140
          return [];
    141
        }
    142
    
    
    143
        // Check if actor or event types changed since last time
    144
        const actorChanged = state.actorId !== actorId;
    145
        const eventTypesChanged = !areEventTypesEqual(state.eventTypes, eventTypes);
    146
    
    
    147
        // Initial run should only persist the last run's ID
    148
        if (actorChanged || eventTypesChanged) {
    149
          await wmill.setState({
    150
            lastRunId: newestRunId,
    151
            actorId: actorId,
    152
            eventTypes: eventTypes,
    153
          });
    154
          return [];
    155
        }
    156
    
    
    157
        // Find index of the previously processed run
    158
        const lastRunId = state.lastRunId;
    159
        const foundIdx = runs.findIndex((r) => r?.id === lastRunId);
    160
    
    
    161
        // If found, return only the newer runs to the left; otherwise return all
    162
        const newerRuns = foundIdx >= 0 ? runs.splice(0, foundIdx) : runs;
    163
    
    
    164
        // Persist the newest run ID + metadata for next trigger
    165
        await wmill.setState({
    166
          lastRunId: newestRunId,
    167
          actorId: actorId,
    168
          eventTypes: eventTypes,
    169
        });
    170
    
    
    171
        // Return array of newer runs (which can be empty if foundIdx is 0)
    172
        return newerRuns;
    173
      } catch (error: any) {
    174
        return {
    175
          error: `Failed to process actor trigger. Reason: ${error.message || error
    176
            }`,
    177
        };
    178
      }
    179
    }
    180