0

ETL / Data pipeline on tabular data

by
Published Mar 13, 2024

Windmill embedded integration with Polars and DuckDB for data pipelines: https://www.windmill.dev/docs/core_concepts/persistent_storage/large_data_files#windmill-embedded-integration-with-polars-and-duckdb-for-data-pipelines

Script polars Verified

The script

Submitted by henri186 Python3
Verified 399 days ago
1
#requirements:
2
polars==0.20.2
3
#s3fs==2023.12.0
4
#wmill>=1.229.0
5

6
# Windmill embedded integration with Polars and DuckDB for data pipelines: https://www.windmill.dev/docs/core_concepts/persistent_storage/large_data_files#windmill-embedded-integration-with-polars-and-duckdb-for-data-pipelines
7

8
import wmill
9
from wmill import S3Object
10
import polars as pl
11
import s3fs
12

13

14
def main(input_file: S3Object):
15
    bucket = wmill.get_resource("<PATH_TO_S3_RESOURCE>")["bucket"]
16

17
    # this will default to the workspace s3 resource
18
    storage_options = wmill.polars_connection_settings().storage_options
19
    # this will use the designated resource
20
    # storage_options = wmill.polars_connection_settings("<PATH_TO_S3_RESOURCE>").storage_options
21

22
    # input is a parquet file, we use read_parquet in lazy mode.
23
    # Polars can read various file types, see
24
    # https://pola-rs.github.io/polars/py-polars/html/reference/io.html
25
    input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
26
    input_df = pl.read_parquet(input_uri, storage_options=storage_options).lazy()
27

28
    # process the Polars dataframe. See Polars docs:
29
    # for dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html
30
    # for lazy dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html
31
    output_df = input_df.collect()
32
    print(output_df)
33

34
    # To write back the result to S3, Polars needs an s3fs connection
35
    s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings().s3fs_args)
36
    output_file = "output/result.parquet"
37
    output_uri = "s3://{}/{}".format(bucket, output_file)
38
    with s3.open(output_uri, mode="wb") as output_s3:
39
        # persist the output dataframe back to S3 and return it
40
        output_df.write_parquet(output_s3)
41

42
    return S3Object(s3=output_file)