Edits history of script submission #22241 for ' Task Run Trigger (for polling) (apify)'

  • bun
    import * as wmill from 'windmill-client';
    
    import { ApifyClient } from 'apify-client@^2.19.0';
    
    type ApifyApiKey = {
      api_key: string;
    };
    
    type Apify = {
      token: string;
    };
    
    type EventType = 'SUCCEEDED' | 'FAILED' | 'TIMED-OUT' | 'ABORTED';
    
    export type DynSelect_taskId = string;
    export async function taskId(api_key?: ApifyApiKey, oauth_token?: Apify) {
      if (!api_key?.api_key && !oauth_token?.token) {
        return [{ value: '', label: 'Missing Apify API key or OAuth token' }];
      }
    
      try {
        const client = createClient(api_key, oauth_token);
    
        const data = await client.tasks().list();
        const items = data?.items ?? [];
    
        return items.map((task: any) => ({
          value: task.id,
          label: task.title || task.name || task.id,
        }));
      } catch (error: any) {
        return [
          { value: '', label: `Failed to load tasks: ${error.message || error}` },
        ];
      }
    }
    
    const createClient = (api_key?: ApifyApiKey, oauth_token?: Apify): ApifyClient => {
      const token = oauth_token?.token ?? api_key?.api_key;
      if (!token) {
        throw new Error('Missing Apify API key or OAuth token');
      }
    
      return new ApifyClient({
        token: token,
        requestInterceptors: [
          (request) => {
            if (!request.headers) {
              request.headers = {};
            }
            request.headers['x-apify-integration-platform'] = 'windmill';
            return request;
          },
        ],
      });
    };
    
    function areEventTypesEqual(a?: EventType[], b?: EventType[]): boolean {
      if (!a || !b) return false;
      return JSON.stringify([...a].sort()) === JSON.stringify([...b].sort());
    }
    
    type TriggerState = {
      lastRunId?: string;
      taskId?: string;
      eventTypes?: EventType[];
    };
    
    export async function main(
      taskId: DynSelect_taskId,
      eventTypes: EventType[],
      runsLimit: number,
      api_key?: ApifyApiKey,
      oauth_token?: Apify,
    ) {
      // Validate event types
      if (!eventTypes.length) {
        return { error: 'At least one of the Event Types has to be picked.' };
      }
    
      // Validate runsLimit to be a positive integer and cap at 1000
      const validatedRunsLimit = Number.isFinite(Number(runsLimit))
        ? Math.min(1000, Math.floor(Number(runsLimit)))
        : NaN;
    
      if (!validatedRunsLimit || validatedRunsLimit < 1) {
        return { error: 'runsLimit must be a positive integer.' };
      }
    
      try {
        const client = createClient(api_key, oauth_token);
    
        // Fetch runs using the provided runsLimit (can return multiple)
        const { items } = await client.task(taskId).runs().list({
          limit: validatedRunsLimit,
          offset: 0,
          desc: true, // newest first
          status: eventTypes,
        });
    
        // Fetched runs array
        const runs: any[] = items ?? [];
    
        // If nothing returned, skip
        if (runs.length === 0) {
          return [];
        }
    
        // Get persisted state
        const state = (await wmill.getState()) as TriggerState | undefined;
        const newestRunId: string = runs[0]?.id;
    
        // First-ever run: only persist and return empty array
        if (!state?.lastRunId) {
          await wmill.setState({
            lastRunId: newestRunId,
            taskId: taskId,
            eventTypes,
          });
          return [];
        }
    
        // Check if task or event types changed since last time
        const taskChanged = state.taskId !== taskId;
        const eventTypesChanged = !areEventTypesEqual(state.eventTypes, eventTypes);
    
        if (taskChanged || eventTypesChanged) {
          // Reset lastRunId to the newest and persist new metadata; return nothing
          await wmill.setState({
            lastRunId: newestRunId,
            taskId: taskId,
            eventTypes,
          });
          return [];
        }
    
        // Find index of the previously processed run
        const lastRunId = state.lastRunId;
        const foundIdx = runs.findIndex((run) => run?.id === lastRunId);
    
        // If found, return only the newer runs to the left; otherwise return all
        const newerRuns = foundIdx >= 0 ? runs.splice(0, foundIdx) : runs;
    
        // Persist the newest run ID + metadata for next trigger
        await wmill.setState({
          lastRunId: newestRunId,
          taskId: taskId,
          eventTypes,
        });
    
        // Return array of newer runs (which can be empty if foundIdx is 0)
        return newerRuns;
      } catch (error: any) {
        return {
          error: `Failed to process task trigger. Reason: ${
            error?.message || error
          }`,
        };
      }
    }
    

    Submitted by jakub.drobnik222 172 days ago