0
ETL / Data pipeline on tabular data
One script reply has been approved by the moderators Verified
Created by henri186 284 days ago Viewed 8452 times
0
Submitted by henri186 Python3
Verified 284 days ago
1
#requirements:
2
wmill>=1.229.0
3
#duckdb==0.9.1
4

5
# Windmill embedded integration with Polars and DuckDB for data pipelines: https://www.windmill.dev/docs/core_concepts/persistent_storage/large_data_files#windmill-embedded-integration-with-polars-and-duckdb-for-data-pipelines
6

7
import wmill
8
from wmill import S3Object
9
import duckdb
10

11

12
def main(input_file: S3Object):
13
    bucket = wmill.get_resource("u/admin/windmill-cloud-demo")["bucket"]
14

15
    # create a DuckDB database in memory
16
    # see https://duckdb.org/docs/api/python/dbapi
17
    conn = duckdb.connect()
18

19
    # this will default to the workspace s3 resource
20
    args = wmill.duckdb_connection_settings().connection_settings_str
21
    # this will use the designated resource
22
    # args = wmill.duckdb_connection_settings("<PATH_TO_S3_RESOURCE>").connection_settings_str
23

24
    # connect duck db to the S3 bucket - this will default to the workspace s3 resource
25
    conn.execute(args)
26

27
    input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
28
    output_file = "output/result.parquet"
29
    output_uri = "s3://{}/{}".format(bucket, output_file)
30

31
    # Run queries directly on the parquet file
32
    query_result = conn.sql(
33
        """
34
        SELECT * FROM read_parquet('{}')
35
    """.format(
36
            input_uri
37
        )
38
    )
39
    query_result.show()
40

41
    # Write the result of a query to a different parquet file on S3
42
    conn.execute(
43
        """
44
        COPY (
45
            SELECT COUNT(*) FROM read_parquet('{input_uri}')
46
        ) TO '{output_uri}' (FORMAT 'parquet');
47
    """.format(
48
            input_uri=input_uri, output_uri=output_uri
49
        )
50
    )
51

52
    conn.close()
53
    return S3Object(s3=output_file)