1 | #requirements: |
2 | polars==0.20.2 |
3 | #s3fs==2023.12.0 |
4 | #wmill>=1.229.0 |
5 |
|
6 | # Windmill embedded integration with Polars and DuckDB for data pipelines: https: |
7 |
|
8 | import wmill |
9 | from wmill import S3Object |
10 | import polars as pl |
11 | import s3fs |
12 |
|
13 |
|
14 | def main(input_file: S3Object): |
15 | bucket = wmill.get_resource("<PATH_TO_S3_RESOURCE>")["bucket"] |
16 |
|
17 | # this will default to the workspace s3 resource |
18 | storage_options = wmill.polars_connection_settings().storage_options |
19 | # this will use the designated resource |
20 | # storage_options = wmill.polars_connection_settings("<PATH_TO_S3_RESOURCE>").storage_options |
21 |
|
22 | # input is a parquet file, we use read_parquet in lazy mode. |
23 | # Polars can read various file types, see |
24 | # https: |
25 | input_uri = "s3://{}/{}".format(bucket, input_file["s3"]) |
26 | input_df = pl.read_parquet(input_uri, storage_options=storage_options).lazy() |
27 |
|
28 | # process the Polars dataframe. See Polars docs: |
29 | # for dataframe: https: |
30 | # for lazy dataframe: https: |
31 | output_df = input_df.collect() |
32 | print(output_df) |
33 |
|
34 | # To write back the result to S3, Polars needs an s3fs connection |
35 | s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings().s3fs_args) |
36 | output_file = "output/result.parquet" |
37 | output_uri = "s3://{}/{}".format(bucket, output_file) |
38 | with s3.open(output_uri, mode="wb") as output_s3: |
39 | # persist the output dataframe back to S3 and return it |
40 | output_df.write_parquet(output_s3) |
41 |
|
42 | return S3Object(s3=output_file) |