//native
// Handles Google push notification payloads (Drive & Calendar watch channels).
// Dispatches to the right handler based on the resource_uri in the payload.
import * as wmill from "windmill-client";
// Change this to point to your Google Workspace OAuth resource
const GWORKSPACE_RESOURCE_PATH = "<path/to/resource>";
const GOOGLE_DRIVE_API = "https://www.googleapis.com/drive/v3";
const GOOGLE_CALENDAR_API = "https://www.googleapis.com/calendar/v3";
// Mirrors the headers sent by Google push notifications (body is always empty)
type GoogleTriggerPayload = {
channel_id: string;
resource_id: string;
resource_state: string; // "sync" | "exists" | "not_exists" | "update"
resource_uri: string;
message_number: string;
channel_expiration: string;
channel_token: string; // custom token set when creating the watch, for verification
changed: string; // Drive-only: comma-separated list (e.g. "content,properties,permissions")
};
// Routes the notification to the appropriate handler: Drive (whole), file-specific, or Calendar
export async function main(payload: GoogleTriggerPayload) {
// Google sends a "sync" event when a watch channel is first created — just acknowledge it
if (payload.resource_state === "sync") {
return {
status: "sync",
message: "Initial sync notification acknowledged",
};
}
const gworkspace: RT.Gworkspace = await wmill.getResource(
GWORKSPACE_RESOURCE_PATH,
);
const token = gworkspace.token;
const headers = {
Authorization: `Bearer ${token}`,
"Content-Type": "application/json",
};
// Detect the type of watch based on the resource URI
const resourceUri = payload.resource_uri;
if (resourceUri.includes("googleapis.com/calendar")) {
return await handleCalendarChange(payload, headers);
} else if (
resourceUri.includes("googleapis.com/drive") &&
resourceUri.includes("/files/")
) {
return await handleFileChange(payload, headers);
} else if (resourceUri.includes("googleapis.com/drive")) {
return await handleDriveChange(payload, headers);
}
return {
status: "unknown",
message: `Unrecognized resource URI: ${resourceUri}`,
payload,
};
}
// Whole-drive watch: uses the Changes API with a persisted page token to fetch incremental changes
async function handleDriveChange(
payload: GoogleTriggerPayload,
headers: Record<string, string>,
) {
const changedTypes = payload.changed
? payload.changed.split(",").map((s) => s.trim())
: [];
const results: Record<string, any> = {
type: "drive_watch",
resource_state: payload.resource_state,
changed_types: changedTypes,
changes: [],
};
// Get the start page token from state, or initialize
let startPageToken: string | undefined;
const state = await wmill.getState();
if (state?.startPageToken) {
startPageToken = state.startPageToken;
}
// First run: no token yet, initialize and return early
if (!startPageToken) {
const tokenRes = await fetch(`${GOOGLE_DRIVE_API}/changes/startPageToken`, {
headers,
});
const tokenData = await tokenRes.json();
await wmill.setState({ startPageToken: tokenData.startPageToken });
results.message = "Initialized change tracking, no previous token found";
return results;
}
// Paginate through all changes since last run
let pageToken: string | undefined = startPageToken;
while (pageToken) {
const params = new URLSearchParams({
pageToken,
fields:
"nextPageToken,newStartPageToken,changes(fileId,removed,time,file(id,name,mimeType,modifiedTime,trashed,webViewLink))",
pageSize: "100",
});
const res = await fetch(`${GOOGLE_DRIVE_API}/changes?${params}`, {
headers,
});
const data = await res.json();
if (data.changes) {
results.changes.push(...data.changes);
}
if (data.newStartPageToken) {
await wmill.setState({ startPageToken: data.newStartPageToken });
}
pageToken = data.nextPageToken;
}
return results;
}
// Single-file watch: fetches the latest file metadata and recent revisions
async function handleFileChange(
payload: GoogleTriggerPayload,
headers: Record<string, string>,
) {
const fileIdMatch = payload.resource_uri.match(/\/files\/([^?/]+)/);
const fileId = fileIdMatch?.[1];
if (!fileId) {
return {
type: "file_watch",
status: "error",
message: `Could not extract file ID from URI: ${payload.resource_uri}`,
};
}
const params = new URLSearchParams({
fields:
"id,name,mimeType,modifiedTime,lastModifyingUser,size,trashed,version,webViewLink",
});
const res = await fetch(`${GOOGLE_DRIVE_API}/files/${fileId}?${params}`, {
headers,
});
const file = await res.json();
let revisions: any[] = [];
try {
const revRes = await fetch(
`${GOOGLE_DRIVE_API}/files/${fileId}/revisions?fields=revisions(id,modifiedTime,lastModifyingUser,size)&pageSize=5`,
{ headers },
);
const revData = await revRes.json();
revisions = revData.revisions ?? [];
} catch {
// Some file types don't support revisions
}
return {
type: "file_watch",
resource_state: payload.resource_state,
changed: payload.changed,
file,
recent_revisions: revisions,
};
}
// Calendar watch: uses incremental sync (syncToken) to fetch only changed events
async function handleCalendarChange(
payload: GoogleTriggerPayload,
headers: Record<string, string>,
) {
const calendarIdMatch = payload.resource_uri.match(/\/calendars\/([^?/]+)/);
const calendarId = calendarIdMatch
? decodeURIComponent(calendarIdMatch[1])
: "primary";
let syncToken: string | undefined;
const state = await wmill.getState();
const stateKey = `calendar_sync_${calendarId}`;
if (state?.[stateKey]) {
syncToken = state[stateKey];
}
const isBootstrap = !syncToken;
const params = new URLSearchParams({
singleEvents: "true",
// Bootstrap: max page size + minimal fields to grab the sync token fast
// Incremental: normal page size with full event data
maxResults: isBootstrap ? "2500" : "50",
...(isBootstrap
? { fields: "nextPageToken,nextSyncToken,items(id)" }
: {}),
});
if (syncToken) {
params.set("syncToken", syncToken);
} else {
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
params.set("timeMin", yesterday);
}
// Paginate through all pages to collect events and get the final nextSyncToken
const allEvents: any[] = [];
let syncReset = false;
let currentParams = params;
while (true) {
const res = await fetch(
`${GOOGLE_CALENDAR_API}/calendars/${encodeURIComponent(calendarId)}/events?${currentParams}`,
{ headers },
);
// 410 GONE means the sync token expired — wipe it and do a full sync
if (res.status === 410) {
syncReset = true;
currentParams = new URLSearchParams({
singleEvents: "true",
maxResults: "2500",
fields: "nextPageToken,nextSyncToken,items(id)",
timeMin: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(),
});
continue;
}
const data = await res.json();
if (!isBootstrap && !syncReset && data.items) {
allEvents.push(...data.items);
}
if (data.nextSyncToken) {
await wmill.setState({ ...state, [stateKey]: data.nextSyncToken });
break;
}
if (data.nextPageToken) {
currentParams = new URLSearchParams(params);
currentParams.set("pageToken", data.nextPageToken);
currentParams.delete("syncToken");
currentParams.delete("timeMin");
} else {
break;
}
}
return {
type: "calendar_watch",
resource_state: payload.resource_state,
calendar_id: calendarId,
...(isBootstrap && { bootstrap: true }),
...(syncReset && { sync_reset: true }),
events: allEvents,
};
}
Submitted by hugo989 1 day ago