#requirements:
wmill>=1.229.0
#duckdb==0.9.1
# Windmill embedded integration with Polars and DuckDB for data pipelines: https://www.windmill.dev/docs/core_concepts/persistent_storage/large_data_files#windmill-embedded-integration-with-polars-and-duckdb-for-data-pipelines
import wmill
from wmill import S3Object
import duckdb
def main(input_file: S3Object):
bucket = wmill.get_resource("u/admin/windmill-cloud-demo")["bucket"]
# create a DuckDB database in memory
# see https://duckdb.org/docs/api/python/dbapi
conn = duckdb.connect()
# this will default to the workspace s3 resource
args = wmill.duckdb_connection_settings().connection_settings_str
# this will use the designated resource
# args = wmill.duckdb_connection_settings("<PATH_TO_S3_RESOURCE>").connection_settings_str
# connect duck db to the S3 bucket - this will default to the workspace s3 resource
conn.execute(args)
input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
output_file = "output/result.parquet"
output_uri = "s3://{}/{}".format(bucket, output_file)
# Run queries directly on the parquet file
query_result = conn.sql(
"""
SELECT * FROM read_parquet('{}')
""".format(
input_uri
)
)
query_result.show()
# Write the result of a query to a different parquet file on S3
conn.execute(
"""
COPY (
SELECT COUNT(*) FROM read_parquet('{input_uri}')
) TO '{output_uri}' (FORMAT 'parquet');
""".format(
input_uri=input_uri, output_uri=output_uri
)
)
conn.close()
return S3Object(s3=output_file)
Submitted by henri186 284 days ago