1 | import { MongoClient, ObjectId } from "mongodb@6"; |
2 | import { getState, setState } from "windmill-client@1"; |
3 |
|
4 | type Mongodb = { |
5 | servers: { host: string; port: number }[]; |
6 | credential: { |
7 | username: string; |
8 | password: string; |
9 | db: string; |
10 | mechanism?: string; |
11 | }; |
12 | db: string; |
13 | tls: boolean; |
14 | }; |
15 |
|
16 | |
17 | * Get recently inserted documents |
18 | * Return documents inserted since the last run, using the timestamp embedded in their ObjectId `_id`. |
19 | */ |
20 | export async function main( |
21 | auth: Mongodb, |
22 | collection: string, |
23 | database?: string, |
24 | ) { |
25 | const lastCheck: number = (await getState()) ?? 0; |
26 | await setState(Date.now() / 1000); |
27 |
|
28 | const client = await mongoClient(auth); |
29 | try { |
30 | return await client |
31 | .db(database || auth.db) |
32 | .collection(collection) |
33 | .find({ _id: { $gt: ObjectId.createFromTime(Math.floor(lastCheck)) } }) |
34 | .toArray(); |
35 | } finally { |
36 | await client.close(); |
37 | } |
38 | } |
39 |
|
40 | async function mongoClient(auth: Mongodb) { |
41 | const hosts = auth.servers.map((s) => `${s.host}:${s.port}`).join(","); |
42 | const options: any = { tls: auth.tls }; |
43 | if (auth.credential?.username) { |
44 | options.auth = { |
45 | username: auth.credential.username, |
46 | password: auth.credential.password, |
47 | }; |
48 | options.authSource = auth.credential.db; |
49 | if (auth.credential.mechanism) { |
50 | options.authMechanism = auth.credential.mechanism; |
51 | } |
52 | } |
53 | const client = new MongoClient(`mongodb://${hosts}`, options); |
54 | await client.connect(); |
55 | return client; |
56 | } |
57 |
|