#requirements:
polars==0.20.2
#s3fs==2023.12.0
#wmill>=1.229.0
# Windmill embedded integration with Polars and DuckDB for data pipelines: https://www.windmill.dev/docs/core_concepts/persistent_storage/large_data_files#windmill-embedded-integration-with-polars-and-duckdb-for-data-pipelines
import wmill
from wmill import S3Object
import polars as pl
import s3fs
def main(input_file: S3Object):
bucket = wmill.get_resource("<PATH_TO_S3_RESOURCE>")["bucket"]
# this will default to the workspace s3 resource
storage_options = wmill.polars_connection_settings().storage_options
# this will use the designated resource
# storage_options = wmill.polars_connection_settings("<PATH_TO_S3_RESOURCE>").storage_options
# input is a parquet file, we use read_parquet in lazy mode.
# Polars can read various file types, see
# https://pola-rs.github.io/polars/py-polars/html/reference/io.html
input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
input_df = pl.read_parquet(input_uri, storage_options=storage_options).lazy()
# process the Polars dataframe. See Polars docs:
# for dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html
# for lazy dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html
output_df = input_df.collect()
print(output_df)
# To write back the result to S3, Polars needs an s3fs connection
s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings().s3fs_args)
output_file = "output/result.parquet"
output_uri = "s3://{}/{}".format(bucket, output_file)
with s3.open(output_uri, mode="wb") as output_s3:
# persist the output dataframe back to S3 and return it
output_df.write_parquet(output_s3)
return S3Object(s3=output_file)Submitted by hugo697 399 days ago
#requirements:
polars==0.20.2
#s3fs==2023.12.0
#wmill>=1.229.0
# Windmill embedded integration with Polars and DuckDB for data pipelines: https://www.windmill.dev/docs/core_concepts/persistent_storage/large_data_files#windmill-embedded-integration-with-polars-and-duckdb-for-data-pipelines
import wmill
from wmill import S3Object
import polars as pl
import s3fs
def main(input_file: S3Object):
bucket = wmill.get_resource("<PATH_TO_S3_RESOURCE>")["bucket"]
# this will default to the workspace s3 resource
storage_options = wmill.polars_connection_settings().storage_options
# this will use the designated resource
# storage_options = wmill.polars_connection_settings("<PATH_TO_S3_RESOURCE>").storage_options
# input is a parquet file, we use read_parquet in lazy mode.
# Polars can read various file types, see
# https://pola-rs.github.io/polars/py-polars/html/reference/io.html
input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
input_df = pl.read_parquet(input_uri, storage_options=storage_options).lazy()
# process the Polars dataframe. See Polars docs:
# for dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html
# for lazy dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html
output_df = input_df.collect()
print(output_df)
# To write back the result to S3, Polars needs an s3fs connection
s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings().s3fs_args)
output_file = "output/result.parquet"
output_uri = "s3://{}/{}".format(bucket, output_file)
with s3.open(output_uri, mode="wb") as output_s3:
# persist the output dataframe back to S3 and return it
output_df.write_parquet(output_s3)
return S3Object(s3=output_file)Submitted by henri186 819 days ago