Edits history of script submission #22242 for ' Actor Run Trigger (for polling) (apify)'

  • bun
    import * as wmill from 'windmill-client';
    
    import { ActorListSortBy, ApifyClient } from 'apify-client@^2.19.0';
    
    type ApifyApiKey = {
      api_key: string;
    };
    
    type Apify = {
      token: string;
    };
    
    type EventType = 'SUCCEEDED' | 'FAILED' | 'TIMED-OUT' | 'ABORTED';
    
    type ActorSource = 'RECENTLY_USED_ACTORS' | 'APIFY_STORE_ACTORS';
    
    export type DynSelect_actorId = string;
    export async function actorId(actorSource: ActorSource, api_key?: ApifyApiKey, oauth_token?: Apify) {
      if (!api_key?.api_key && !oauth_token?.token) {
        return [{ value: '', label: 'Missing Apify API key or OAuth token' }];
      }
    
      const client = createClient(api_key, oauth_token);
    
      const mapActorToSelectOption = (actor: any) => {
        const optionName = actor.title
          ? `${actor.title} (${actor.username}/${actor.name})`
          : `${actor.username}/${actor.name}`;
    
        return {
          label: optionName,
          value: actor.id,
        };
      };
    
      try {
        if (actorSource === 'RECENTLY_USED_ACTORS') {
          const recentActors = await client.actors().list({
            limit: 100,
            offset: 0,
            sortBy: ActorListSortBy.LAST_RUN_STARTED_AT,
            desc: true,
          });
          return recentActors.items.map(mapActorToSelectOption);
        }
    
        const storeActors = await client.store().list({
          limit: 1000,
          offset: 0,
        });
        return storeActors.items.map(mapActorToSelectOption);
      } catch (error: any) {
        return [
          { value: '', label: `Failed to load actors: ${error.message || error}` },
        ];
      }
    }
    
    const createClient = (api_key?: ApifyApiKey, oauth_token?: Apify): ApifyClient => {
      const token = oauth_token?.token ?? api_key?.api_key;
      if (!token) {
        throw new Error('Missing Apify API key or OAuth token');
      }
    
      return new ApifyClient({
        token: token,
        requestInterceptors: [
          (request) => {
            if (!request.headers) {
              request.headers = {};
            }
            request.headers['x-apify-integration-platform'] = 'windmill';
            return request;
          },
        ],
      });
    };
    
    function areEventTypesEqual(a?: EventType[], b?: EventType[]): boolean {
      if (!a || !b) return false;
      return JSON.stringify([...a].sort()) === JSON.stringify([...b].sort());
    }
    // SYNC_BLOCK_END
    
    type TriggerState = {
      lastRunId?: string;
      actorId?: string;
      eventTypes?: EventType[];
    };
    
    export async function main(
      actorSource: ActorSource,
      actorId: DynSelect_actorId,
      eventTypes: EventType[],
      runsLimit: number,
      api_key?: ApifyApiKey,
      oauth_token?: Apify,
    ) {
      if (!eventTypes.length) {
        return { error: 'At least one of the Event Types has to be picked.' };
      }
    
      // Validate runsLimit to be a positive integer and cap at 1000
      const validatedRunsLimit = Number.isFinite(Number(runsLimit))
        ? Math.min(1000, Math.floor(Number(runsLimit)))
        : NaN;
    
      if (!validatedRunsLimit || validatedRunsLimit < 1) {
        return { error: 'runsLimit must be a positive integer.' };
      }
    
      try {
        const client = createClient(api_key, oauth_token);
    
        const { items } = await client.actor(actorId).runs().list({
          limit: validatedRunsLimit,
          offset: 0,
          desc: true,
          status: eventTypes,
        });
    
        const runs: any[] = items ?? [];
    
        // If nothing returned, skip
        if (runs.length === 0) {
          return [];
        }
    
        // Get persisted state
        const state = (await wmill.getState()) as TriggerState | undefined;
        const newestRunId: string = runs[0]?.id;
    
        // First-ever run: only persist and return empty array
        if (!state?.lastRunId) {
          await wmill.setState({
            lastRunId: newestRunId,
            actorId: actorId,
            eventTypes: eventTypes,
          });
          return [];
        }
    
        // Check if actor or event types changed since last time
        const actorChanged = state.actorId !== actorId;
        const eventTypesChanged = !areEventTypesEqual(state.eventTypes, eventTypes);
    
        // Initial run should only persist the last run's ID
        if (actorChanged || eventTypesChanged) {
          await wmill.setState({
            lastRunId: newestRunId,
            actorId: actorId,
            eventTypes: eventTypes,
          });
          return [];
        }
    
        // Find index of the previously processed run
        const lastRunId = state.lastRunId;
        const foundIdx = runs.findIndex((r) => r?.id === lastRunId);
    
        // If found, return only the newer runs to the left; otherwise return all
        const newerRuns = foundIdx >= 0 ? runs.splice(0, foundIdx) : runs;
    
        // Persist the newest run ID + metadata for next trigger
        await wmill.setState({
          lastRunId: newestRunId,
          actorId: actorId,
          eventTypes: eventTypes,
        });
    
        // Return array of newer runs (which can be empty if foundIdx is 0)
        return newerRuns;
      } catch (error: any) {
        return {
          error: `Failed to process actor trigger. Reason: ${error.message || error
            }`,
        };
      }
    }
    

    Submitted by jakub.drobnik222 172 days ago