import * as wmill from 'windmill-client';
import { ActorListSortBy, ApifyClient } from 'apify-client@^2.19.0';
type ApifyApiKey = {
api_key: string;
};
type Apify = {
token: string;
};
type EventType = 'SUCCEEDED' | 'FAILED' | 'TIMED-OUT' | 'ABORTED';
type ActorSource = 'RECENTLY_USED_ACTORS' | 'APIFY_STORE_ACTORS';
export type DynSelect_actorId = string;
export async function actorId(actorSource: ActorSource, api_key?: ApifyApiKey, oauth_token?: Apify) {
if (!api_key?.api_key && !oauth_token?.token) {
return [{ value: '', label: 'Missing Apify API key or OAuth token' }];
}
const client = createClient(api_key, oauth_token);
const mapActorToSelectOption = (actor: any) => {
const optionName = actor.title
? `${actor.title} (${actor.username}/${actor.name})`
: `${actor.username}/${actor.name}`;
return {
label: optionName,
value: actor.id,
};
};
try {
if (actorSource === 'RECENTLY_USED_ACTORS') {
const recentActors = await client.actors().list({
limit: 100,
offset: 0,
sortBy: ActorListSortBy.LAST_RUN_STARTED_AT,
desc: true,
});
return recentActors.items.map(mapActorToSelectOption);
}
const storeActors = await client.store().list({
limit: 1000,
offset: 0,
});
return storeActors.items.map(mapActorToSelectOption);
} catch (error: any) {
return [
{ value: '', label: `Failed to load actors: ${error.message || error}` },
];
}
}
const createClient = (api_key?: ApifyApiKey, oauth_token?: Apify): ApifyClient => {
const token = oauth_token?.token ?? api_key?.api_key;
if (!token) {
throw new Error('Missing Apify API key or OAuth token');
}
return new ApifyClient({
token: token,
requestInterceptors: [
(request) => {
if (!request.headers) {
request.headers = {};
}
request.headers['x-apify-integration-platform'] = 'windmill';
return request;
},
],
});
};
function areEventTypesEqual(a?: EventType[], b?: EventType[]): boolean {
if (!a || !b) return false;
return JSON.stringify([...a].sort()) === JSON.stringify([...b].sort());
}
// SYNC_BLOCK_END
type TriggerState = {
lastRunId?: string;
actorId?: string;
eventTypes?: EventType[];
};
export async function main(
actorSource: ActorSource,
actorId: DynSelect_actorId,
eventTypes: EventType[],
runsLimit: number,
api_key?: ApifyApiKey,
oauth_token?: Apify,
) {
if (!eventTypes.length) {
return { error: 'At least one of the Event Types has to be picked.' };
}
// Validate runsLimit to be a positive integer and cap at 1000
const validatedRunsLimit = Number.isFinite(Number(runsLimit))
? Math.min(1000, Math.floor(Number(runsLimit)))
: NaN;
if (!validatedRunsLimit || validatedRunsLimit < 1) {
return { error: 'runsLimit must be a positive integer.' };
}
try {
const client = createClient(api_key, oauth_token);
const { items } = await client.actor(actorId).runs().list({
limit: validatedRunsLimit,
offset: 0,
desc: true,
status: eventTypes,
});
const runs: any[] = items ?? [];
// If nothing returned, skip
if (runs.length === 0) {
return [];
}
// Get persisted state
const state = (await wmill.getState()) as TriggerState | undefined;
const newestRunId: string = runs[0]?.id;
// First-ever run: only persist and return empty array
if (!state?.lastRunId) {
await wmill.setState({
lastRunId: newestRunId,
actorId: actorId,
eventTypes: eventTypes,
});
return [];
}
// Check if actor or event types changed since last time
const actorChanged = state.actorId !== actorId;
const eventTypesChanged = !areEventTypesEqual(state.eventTypes, eventTypes);
// Initial run should only persist the last run's ID
if (actorChanged || eventTypesChanged) {
await wmill.setState({
lastRunId: newestRunId,
actorId: actorId,
eventTypes: eventTypes,
});
return [];
}
// Find index of the previously processed run
const lastRunId = state.lastRunId;
const foundIdx = runs.findIndex((r) => r?.id === lastRunId);
// If found, return only the newer runs to the left; otherwise return all
const newerRuns = foundIdx >= 0 ? runs.splice(0, foundIdx) : runs;
// Persist the newest run ID + metadata for next trigger
await wmill.setState({
lastRunId: newestRunId,
actorId: actorId,
eventTypes: eventTypes,
});
// Return array of newer runs (which can be empty if foundIdx is 0)
return newerRuns;
} catch (error: any) {
return {
error: `Failed to process actor trigger. Reason: ${error.message || error
}`,
};
}
}
Submitted by jakub.drobnik222 11 days ago