Search... Ctrl + K

Import workspace from tarball (windmill)
One script reply has been approved by the moderators Verified

Created by admin 50 days ago Used 61 times 1 Point

Takes a tarball in and import scripts, resources, and resource types from a tarball exported of a workspace.

Is used as webhook target for the windmill-gh-action-deploy

No comments yet

Points: 0
python3
One script reply has been approved by the moderators
Ap­pro­ved
import tarfile
import io
import wmill
import json

from windmill_api.api.script import get_script_by_path, create_script
from windmill_api.models.create_script_json_body import CreateScriptJsonBody
from windmill_api.models.create_script_json_body_language import (
    CreateScriptJsonBodyLanguage,
)


from windmill_api.models.create_script_json_body_schema import (
    CreateScriptJsonBodySchema,
)
from windmill_api.models.update_resource_json_body import UpdateResourceJsonBody
from windmill_api.models.create_resource_json_body import CreateResourceJsonBody

from windmill_api.models.update_resource_type_json_body import (
    UpdateResourceTypeJsonBody,
)

from windmill_api.models.create_resource_type_json_body import (
    CreateResourceTypeJsonBody,
)
from windmill_api.api.resource import (
    create_resource,
    create_resource_type,
    update_resource,
    update_resource_type,
    get_resource,
    get_resource_type,
)

client = wmill.create_client()
workspace = wmill.get_workspace()


SCRIPTS_PREFIX = "scripts/"
RESOURCES_PREFIX = "resources/"
RESOURCE_TYPES_PREFIX = "resource_types/"


def main(tarball: bytes, dry_run: bool = False):
    print("Tarball of size: {} bytes".format(len(tarball)))
    io_bytes = io.BytesIO(tarball)
    tar = tarfile.open(fileobj=io_bytes, mode="r")
    names = tar.getnames()

    for m in tar.getmembers():
        if m.name.startswith(SCRIPTS_PREFIX) and (
            m.name.endswith(".py") or m.name.endswith(".ts") or m.name.endswith(".go")
        ):
            path = m.name[len(SCRIPTS_PREFIX) : -len(".py")]
            print("Processing script {}, {}".format(path, m.name))
            get_script_response = get_script_by_path.sync_detailed(
                workspace=workspace, path=path, client=client
            )


            json_tar_path = "{}.json".format(m.name[: -len(".py")])
            has_json = json_tar_path in names

            parent_hash = None
            summary = None
            description = None
            is_template = None
            schema = None

            content = tar.extractfile(m).read().decode("utf-8")

            if get_script_response.status_code == 200:
                old_script = json.loads(get_script_response.content)

                if old_script["content"] == content:
                    if not has_json:
                        print(
                            "same content and no metadata for this script in tarball, no need to update"
                        )
                        continue

                    json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                    metadata = json.loads(json_content)

                    if (
                        old_script.get("summary") == metadata.get("summary")
                        and old_script.get("description") == metadata.get("description")
                        and old_script.get("is_template") == metadata.get("is_template")
                        and old_script.get("schema") == metadata.get("schema")
                        and old_script.get("lock") == metadata.get("lock")
                    ):
                        print("same content and no metadata, no need to update")
                        continue

                parent_hash = old_script["hash"]
                summary = old_script.get("summary")
                description = old_script.get("description")
                is_template = old_script.get("is_template")
                schema = old_script.get("schema")
                lock = None

            if has_json:
                json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                metadata = json.loads(json_content)

                summary = metadata.get("summary")
                description = metadata.get("description")
                is_template = metadata.get("is_template")
                schema = metadata.get("schema")
                lock = metadata.get("lock")
                if lock == []:
                    lock = None

            if schema:
                schema = CreateScriptJsonBodySchema.from_dict(schema)
            else:
                schema = CreateScriptJsonBodySchema.from_dict(dict())

            print("Uploading new version of script at path: {}".format(path))

            if dry_run:
                print("Skipped because dry-run")
            else:
                if m.name.endswith(".ts"):
                    language = CreateScriptJsonBodyLanguage.DENO
                elif m.name.endswith(".py"):
                    language = CreateScriptJsonBodyLanguage.PYTHON3
                elif m.name.endswith(".go"):
                    language = CreateScriptJsonBodyLanguage.GO

                r = create_script.sync_detailed(
                    workspace=workspace,
                    client=client,
                    json_body=CreateScriptJsonBody(
                        language=language,
                        content=content,
                        path=path,
                        parent_hash=parent_hash,
                        summary=summary,
                        description=description,
                        is_template=is_template,
                        schema=schema,
                        lock=lock,
                    ),
                )
                print(r.content)
        if m.name.startswith(RESOURCES_PREFIX) and m.name.endswith(".json"):
            path = m.name[len(RESOURCES_PREFIX) : -len(".json")]
            print("Processing resource {}, {}".format(path, m.name))
            get_resource_response = get_resource.sync_detailed(
                workspace=workspace, path=path, client=client
            )
            content = tar.extractfile(m).read().decode("utf-8")
            resource = json.loads(content)
            res_error = False
            if not res_error and get_resource_response.status_code == 200:
                old_resource = json.loads(get_resource_response.content)
                if resource["value"] != old_resource["value"]:
                    print("Updating existing resource")
                    r = update_resource.sync_detailed(
                        workspace=workspace,
                        path=path,
                        json_body=UpdateResourceJsonBody.from_dict(resource),
                        client=client,
                    )
                    print(r)
                else:
                    print("Skipping updating identical resource")
            else:
                print("Creating new resource")
                r = create_resource.sync_detailed(
                    workspace=workspace,
                    json_body=CreateResourceJsonBody.from_dict(resource),
                    client=client,
                )
                print(r)

        if m.name.startswith(RESOURCE_TYPES_PREFIX) and m.name.endswith(".json"):
            path = m.name[len(RESOURCE_TYPES_PREFIX) : -len(".json")]
            print("Processing resource type {}, {}".format(path, m.name))
            get_resource_response = get_resource_type.sync_detailed(
                workspace=workspace, path=path, client=client
            )
            content = tar.extractfile(m).read().decode("utf-8")
            resource = json.loads(content)
            print(resource)
            res_error = False

            if not res_error and get_resource_response.status_code == 200:
                old_resource_type = json.loads(get_resource_response.content)
                if resource["schema"] != old_resource_type["schema"]:
                    print("Updating existing resource type")
                    r = update_resource_type.sync_detailed(
                        workspace=workspace,
                        path=path,
                        json_body=UpdateResourceTypeJsonBody.from_dict(resource),
                        client=client,
                    )
                    print(r)
                else:
                    print("Skipping updating identical resource type")

            else:
                print("Creating new resource type")
                r = create_resource_type.sync_detailed(
                    workspace=workspace,
                    json_body=CreateResourceTypeJsonBody.from_dict(resource),
                    client=client,
                )
                print(r)
All edits Permalink

Submitted by admin 50 days ago

Edited 44 days ago

No comments yet