import * as wmill from 'windmill-client';
import { ApifyClient } from 'apify-client@^2.19.0';
type ApifyApiKey = {
api_key: string;
};
type Apify = {
token: string;
};
type EventType = 'SUCCEEDED' | 'FAILED' | 'TIMED-OUT' | 'ABORTED';
export type DynSelect_taskId = string;
export async function taskId(api_key?: ApifyApiKey, oauth_token?: Apify) {
if (!api_key?.api_key && !oauth_token?.token) {
return [{ value: '', label: 'Missing Apify API key or OAuth token' }];
}
try {
const client = createClient(api_key, oauth_token);
const data = await client.tasks().list();
const items = data?.items ?? [];
return items.map((task: any) => ({
value: task.id,
label: task.title || task.name || task.id,
}));
} catch (error: any) {
return [
{ value: '', label: `Failed to load tasks: ${error.message || error}` },
];
}
}
const createClient = (api_key?: ApifyApiKey, oauth_token?: Apify): ApifyClient => {
const token = oauth_token?.token ?? api_key?.api_key;
if (!token) {
throw new Error('Missing Apify API key or OAuth token');
}
return new ApifyClient({
token: token,
requestInterceptors: [
(request) => {
if (!request.headers) {
request.headers = {};
}
request.headers['x-apify-integration-platform'] = 'windmill';
return request;
},
],
});
};
function areEventTypesEqual(a?: EventType[], b?: EventType[]): boolean {
if (!a || !b) return false;
return JSON.stringify([...a].sort()) === JSON.stringify([...b].sort());
}
type TriggerState = {
lastRunId?: string;
taskId?: string;
eventTypes?: EventType[];
};
export async function main(
taskId: DynSelect_taskId,
eventTypes: EventType[],
runsLimit: number,
api_key?: ApifyApiKey,
oauth_token?: Apify,
) {
// Validate event types
if (!eventTypes.length) {
return { error: 'At least one of the Event Types has to be picked.' };
}
// Validate runsLimit to be a positive integer and cap at 1000
const validatedRunsLimit = Number.isFinite(Number(runsLimit))
? Math.min(1000, Math.floor(Number(runsLimit)))
: NaN;
if (!validatedRunsLimit || validatedRunsLimit < 1) {
return { error: 'runsLimit must be a positive integer.' };
}
try {
const client = createClient(api_key, oauth_token);
// Fetch runs using the provided runsLimit (can return multiple)
const { items } = await client.task(taskId).runs().list({
limit: validatedRunsLimit,
offset: 0,
desc: true, // newest first
status: eventTypes,
});
// Fetched runs array
const runs: any[] = items ?? [];
// If nothing returned, skip
if (runs.length === 0) {
return [];
}
// Get persisted state
const state = (await wmill.getState()) as TriggerState | undefined;
const newestRunId: string = runs[0]?.id;
// First-ever run: only persist and return empty array
if (!state?.lastRunId) {
await wmill.setState({
lastRunId: newestRunId,
taskId: taskId,
eventTypes,
});
return [];
}
// Check if task or event types changed since last time
const taskChanged = state.taskId !== taskId;
const eventTypesChanged = !areEventTypesEqual(state.eventTypes, eventTypes);
if (taskChanged || eventTypesChanged) {
// Reset lastRunId to the newest and persist new metadata; return nothing
await wmill.setState({
lastRunId: newestRunId,
taskId: taskId,
eventTypes,
});
return [];
}
// Find index of the previously processed run
const lastRunId = state.lastRunId;
const foundIdx = runs.findIndex((run) => run?.id === lastRunId);
// If found, return only the newer runs to the left; otherwise return all
const newerRuns = foundIdx >= 0 ? runs.splice(0, foundIdx) : runs;
// Persist the newest run ID + metadata for next trigger
await wmill.setState({
lastRunId: newestRunId,
taskId: taskId,
eventTypes,
});
// Return array of newer runs (which can be empty if foundIdx is 0)
return newerRuns;
} catch (error: any) {
return {
error: `Failed to process task trigger. Reason: ${
error?.message || error
}`,
};
}
}
Submitted by jakub.drobnik222 11 days ago