Edits history of script submission #177 for ' Import workspace from tarball (windmill)'

  • python3
    One script reply has been approved by the moderators
    Ap­pro­ved
    import tarfile
    import io
    import wmill
    import json
    
    from windmill_api.api.script import get_script_by_path, create_script
    from windmill_api.models.create_script_json_body import CreateScriptJsonBody
    from windmill_api.models.create_script_json_body_language import (
        CreateScriptJsonBodyLanguage,
    )
    
    
    from windmill_api.models.create_script_json_body_schema import (
        CreateScriptJsonBodySchema,
    )
    from windmill_api.models.update_resource_json_body import UpdateResourceJsonBody
    from windmill_api.models.create_resource_json_body import CreateResourceJsonBody
    
    from windmill_api.models.update_resource_type_json_body import (
        UpdateResourceTypeJsonBody,
    )
    
    from windmill_api.models.create_resource_type_json_body import (
        CreateResourceTypeJsonBody,
    )
    from windmill_api.api.resource import (
        create_resource,
        create_resource_type,
        update_resource,
        update_resource_type,
        get_resource,
        get_resource_type,
    )
    
    client = wmill.create_client()
    workspace = wmill.get_workspace()
    
    
    SCRIPTS_PREFIX = "scripts/"
    RESOURCES_PREFIX = "resources/"
    RESOURCE_TYPES_PREFIX = "resource_types/"
    
    
    def main(tarball: bytes, dry_run: bool = False):
        print("Tarball of size: {} bytes".format(len(tarball)))
        io_bytes = io.BytesIO(tarball)
        tar = tarfile.open(fileobj=io_bytes, mode="r")
        names = tar.getnames()
    
        for m in tar.getmembers():
            if m.name.startswith(SCRIPTS_PREFIX) and (
                m.name.endswith(".py") or m.name.endswith(".ts") or m.name.endswith(".go")
            ):
                path = m.name[len(SCRIPTS_PREFIX) : -len(".py")]
                print("Processing script {}, {}".format(path, m.name))
                get_script_response = get_script_by_path.sync_detailed(
                    workspace=workspace, path=path, client=client
                )
    
    
                json_tar_path = "{}.json".format(m.name[: -len(".py")])
                has_json = json_tar_path in names
    
                parent_hash = None
                summary = None
                description = None
                is_template = None
                schema = None
    
                content = tar.extractfile(m).read().decode("utf-8")
    
                if get_script_response.status_code == 200:
                    old_script = json.loads(get_script_response.content)
    
                    if old_script["content"] == content:
                        if not has_json:
                            print(
                                "same content and no metadata for this script in tarball, no need to update"
                            )
                            continue
    
                        json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                        metadata = json.loads(json_content)
    
                        if (
                            old_script.get("summary") == metadata.get("summary")
                            and old_script.get("description") == metadata.get("description")
                            and old_script.get("is_template") == metadata.get("is_template")
                            and old_script.get("schema") == metadata.get("schema")
                            and old_script.get("lock") == metadata.get("lock")
                        ):
                            print("same content and no metadata, no need to update")
                            continue
    
                    parent_hash = old_script["hash"]
                    summary = old_script.get("summary")
                    description = old_script.get("description")
                    is_template = old_script.get("is_template")
                    schema = old_script.get("schema")
                    lock = None
    
                if has_json:
                    json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                    metadata = json.loads(json_content)
    
                    summary = metadata.get("summary")
                    description = metadata.get("description")
                    is_template = metadata.get("is_template")
                    schema = metadata.get("schema")
                    lock = metadata.get("lock")
                    if lock == []:
                        lock = None
    
                if schema:
                    schema = CreateScriptJsonBodySchema.from_dict(schema)
                else:
                    schema = CreateScriptJsonBodySchema.from_dict(dict())
    
                print("Uploading new version of script at path: {}".format(path))
    
                if dry_run:
                    print("Skipped because dry-run")
                else:
                    if m.name.endswith(".ts"):
                        language = CreateScriptJsonBodyLanguage.DENO
                    elif m.name.endswith(".py"):
                        language = CreateScriptJsonBodyLanguage.PYTHON3
                    elif m.name.endswith(".go"):
                        language = CreateScriptJsonBodyLanguage.GO
    
                    r = create_script.sync_detailed(
                        workspace=workspace,
                        client=client,
                        json_body=CreateScriptJsonBody(
                            language=language,
                            content=content,
                            path=path,
                            parent_hash=parent_hash,
                            summary=summary,
                            description=description,
                            is_template=is_template,
                            schema=schema,
                            lock=lock,
                        ),
                    )
                    print(r.content)
            if m.name.startswith(RESOURCES_PREFIX) and m.name.endswith(".json"):
                path = m.name[len(RESOURCES_PREFIX) : -len(".json")]
                print("Processing resource {}, {}".format(path, m.name))
                get_resource_response = get_resource.sync_detailed(
                    workspace=workspace, path=path, client=client
                )
                content = tar.extractfile(m).read().decode("utf-8")
                resource = json.loads(content)
                res_error = False
                if not res_error and get_resource_response.status_code == 200:
                    old_resource = json.loads(get_resource_response.content)
                    if resource["value"] != old_resource["value"]:
                        print("Updating existing resource")
                        r = update_resource.sync_detailed(
                            workspace=workspace,
                            path=path,
                            json_body=UpdateResourceJsonBody.from_dict(resource),
                            client=client,
                        )
                        print(r)
                    else:
                        print("Skipping updating identical resource")
                else:
                    print("Creating new resource")
                    r = create_resource.sync_detailed(
                        workspace=workspace,
                        json_body=CreateResourceJsonBody.from_dict(resource),
                        client=client,
                    )
                    print(r)
    
            if m.name.startswith(RESOURCE_TYPES_PREFIX) and m.name.endswith(".json"):
                path = m.name[len(RESOURCE_TYPES_PREFIX) : -len(".json")]
                print("Processing resource type {}, {}".format(path, m.name))
                get_resource_response = get_resource_type.sync_detailed(
                    workspace=workspace, path=path, client=client
                )
                content = tar.extractfile(m).read().decode("utf-8")
                resource = json.loads(content)
                print(resource)
                res_error = False
    
                if not res_error and get_resource_response.status_code == 200:
                    old_resource_type = json.loads(get_resource_response.content)
                    if resource["schema"] != old_resource_type["schema"]:
                        print("Updating existing resource type")
                        r = update_resource_type.sync_detailed(
                            workspace=workspace,
                            path=path,
                            json_body=UpdateResourceTypeJsonBody.from_dict(resource),
                            client=client,
                        )
                        print(r)
                    else:
                        print("Skipping updating identical resource type")
    
                else:
                    print("Creating new resource type")
                    r = create_resource_type.sync_detailed(
                        workspace=workspace,
                        json_body=CreateResourceTypeJsonBody.from_dict(resource),
                        client=client,
                    )
                    print(r)
    

    Submitted by admin 1321 days ago

  • python3
    import tarfile
    import io
    import wmill
    import json
    
    from windmill_api.api.script import get_script_by_path, create_script
    from windmill_api.models.create_script_json_body import CreateScriptJsonBody
    from windmill_api.models.create_script_json_body_language import (
        CreateScriptJsonBodyLanguage,
    )
    
    
    from windmill_api.models.create_script_json_body_schema import (
        CreateScriptJsonBodySchema,
    )
    from windmill_api.models.update_resource_json_body import UpdateResourceJsonBody
    from windmill_api.models.create_resource_json_body import CreateResourceJsonBody
    
    from windmill_api.models.update_resource_type_json_body import (
        UpdateResourceTypeJsonBody,
    )
    
    from windmill_api.models.create_resource_type_json_body import (
        CreateResourceTypeJsonBody,
    )
    from windmill_api.api.resource import (
        create_resource,
        create_resource_type,
        update_resource,
        update_resource_type,
        get_resource,
        get_resource_type,
    )
    
    client = wmill.create_client()
    workspace = wmill.get_workspace()
    
    
    SCRIPTS_PREFIX = "scripts/"
    RESOURCES_PREFIX = "resources/"
    RESOURCE_TYPES_PREFIX = "resource_types/"
    
    
    def main(tarball: bytes, dry_run: bool = False):
        print("Tarball of size: {} bytes".format(len(tarball)))
        io_bytes = io.BytesIO(tarball)
        tar = tarfile.open(fileobj=io_bytes, mode="r")
        names = tar.getnames()
    
        for m in tar.getmembers():
            if m.name.startswith(SCRIPTS_PREFIX) and (
                m.name.endswith(".py") or m.name.endswith(".ts") or m.name.endswith(".go")
            ):
                path = m.name[len(SCRIPTS_PREFIX) : -len(".py")]
                print("Processing script {}, {}".format(path, m.name))
                get_script_response = get_script_by_path.sync_detailed(
                    workspace=workspace, path=path, client=client
                )
    
    
                json_tar_path = "{}.json".format(m.name[: -len(".py")])
                has_json = json_tar_path in names
    
                parent_hash = None
                summary = None
                description = None
                is_template = None
                schema = None
    
                content = tar.extractfile(m).read().decode("utf-8")
    
                if get_script_response.status_code == 200:
                    old_script = json.loads(get_script_response.content)
    
                    if old_script["content"] == content:
                        if not has_json:
                            print(
                                "same content and no metadata for this script in tarball, no need to update"
                            )
                            continue
    
                        json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                        metadata = json.loads(json_content)
    
                        if (
                            old_script.get("summary") == metadata.get("summary")
                            and old_script.get("description") == metadata.get("description")
                            and old_script.get("is_template") == metadata.get("is_template")
                            and old_script.get("schema") == metadata.get("schema")
                            and old_script.get("lock") == metadata.get("lock")
                        ):
                            print("same content and no metadata, no need to update")
                            continue
    
                    parent_hash = old_script["hash"]
                    summary = old_script.get("summary")
                    description = old_script.get("description")
                    is_template = old_script.get("is_template")
                    schema = old_script.get("schema")
                    lock = None
    
                if has_json:
                    json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                    metadata = json.loads(json_content)
    
                    summary = metadata.get("summary")
                    description = metadata.get("description")
                    is_template = metadata.get("is_template")
                    schema = metadata.get("schema")
                    lock = metadata.get("lock")
                    if lock == []:
                        lock = None
    
                if schema:
                    schema = CreateScriptJsonBodySchema.from_dict(schema)
                else:
                    schema = CreateScriptJsonBodySchema.from_dict(dict())
    
                print("Uploading new version of script at path: {}".format(path))
    
                if dry_run:
                    print("Skipped because dry-run")
                else:
                    if m.name.endswith(".ts"):
                        language = CreateScriptJsonBodyLanguage.DENO
                    elif m.name.endswith(".py"):
                        language = CreateScriptJsonBodyLanguage.PYTHON3
                    elif m.name.endswith(".go"):
                        language = CreateScriptJsonBodyLanguage.GO
    
                    r = create_script.sync_detailed(
                        workspace=workspace,
                        client=client,
                        json_body=CreateScriptJsonBody(
                            language=language,
                            content=content,
                            path=path,
                            parent_hash=parent_hash,
                            summary=summary,
                            description=description,
                            is_template=is_template,
                            schema=schema,
                            lock=lock,
                        ),
                    )
                    print(r.content)
            if m.name.startswith(RESOURCES_PREFIX) and m.name.endswith(".json"):
                path = m.name[len(RESOURCES_PREFIX) : -len(".json")]
                print("Processing resource {}, {}".format(path, m.name))
                get_resource_response = get_resource.sync_detailed(
                    workspace=workspace, path=path, client=client
                )
                content = tar.extractfile(m).read().decode("utf-8")
                resource = json.loads(content)
                res_error = False
                if not res_error and get_resource_response.status_code == 200:
                    old_resource = json.loads(get_resource_response.content)
                    if resource["value"] != old_resource["value"]:
                        print("Updating existing resource")
                        r = update_resource.sync_detailed(
                            workspace=workspace,
                            path=path,
                            json_body=UpdateResourceJsonBody.from_dict(resource),
                            client=client,
                        )
                        print(r)
                    else:
                        print("Skipping updating identical resource")
                else:
                    print("Creating new resource")
                    r = create_resource.sync_detailed(
                        workspace=workspace,
                        json_body=CreateResourceJsonBody.from_dict(resource),
                        client=client,
                    )
                    print(r)
    
            if m.name.startswith(RESOURCE_TYPES_PREFIX) and m.name.endswith(".json"):
                path = m.name[len(RESOURCE_TYPES_PREFIX) : -len(".json")]
                print("Processing resource type {}, {}".format(path, m.name))
                get_resource_response = get_resource_type.sync_detailed(
                    workspace=workspace, path=path, client=client
                )
                content = tar.extractfile(m).read().decode("utf-8")
                resource = json.loads(content)
                print(resource)
                res_error = False
    
                if not res_error and get_resource_response.status_code == 200:
                    old_resource_type = json.loads(get_resource_response.content)
                    if resource["schema"] != old_resource_type["schema"]:
                        print("Updating existing resource type")
                        r = update_resource_type.sync_detailed(
                            workspace=workspace,
                            path=path,
                            json_body=UpdateResourceTypeJsonBody.from_dict(resource),
                            client=client,
                        )
                        print(r)
                    else:
                        print("Skipping updating identical resource type")
    
                else:
                    print("Creating new resource type")
                    r = create_resource_type.sync_detailed(
                        workspace=workspace,
                        json_body=CreateResourceTypeJsonBody.from_dict(resource),
                        client=client,
                    )
                    print(r)
    

    Submitted by admin 1322 days ago

  • python3
    import tarfile
    import io
    import wmill
    import json
    
    from windmill_api.api.script import get_script_by_path, create_script
    from windmill_api.models.create_script_json_body import CreateScriptJsonBody
    from windmill_api.models.create_script_json_body_schema import (
        CreateScriptJsonBodySchema,
    )
    from windmill_api.models.update_resource_json_body import UpdateResourceJsonBody
    from windmill_api.models.create_resource_json_body import CreateResourceJsonBody
    
    from windmill_api.models.update_resource_type_json_body import (
        UpdateResourceTypeJsonBody,
    )
    
    from windmill_api.models.create_resource_type_json_body import (
        CreateResourceTypeJsonBody,
    )
    from windmill_api.api.resource import (
        create_resource,
        create_resource_type,
        update_resource,
        update_resource_type,
        get_resource,
        get_resource_type,
    )
    
    
    client = wmill.Client()
    
    SCRIPTS_PREFIX = "scripts/"
    RESOURCES_PREFIX = "resources/"
    RESOURCE_TYPES_PREFIX = "resource_types/"
    
    
    def main(tarball: bytes, dry_run: bool = False):
        print("Tarball of size: {} bytes".format(len(tarball)))
        io_bytes = io.BytesIO(tarball)
        tar = tarfile.open(fileobj=io_bytes, mode="r")
        names = tar.getnames()
    
        for m in tar.getmembers():
            if m.name.startswith(SCRIPTS_PREFIX) and m.name.endswith(".py"):
                path = m.name[len(SCRIPTS_PREFIX) : -len(".py")]
                print("Processing script {}, {}".format(path, m.name))
                get_script_response = get_script_by_path.sync_detailed(
                    workspace=client.workspace, path=path, client=client.client
                )
    
                json_tar_path = "{}.json".format(m.name[: -len(".py")])
                has_json = json_tar_path in names
    
                parent_hash = None
                summary = None
                description = None
                is_template = None
                schema = None
    
                content = tar.extractfile(m).read().decode("utf-8")
    
                if get_script_response.status_code == 200:
                    old_script = json.loads(get_script_response.content)
    
                    if old_script["content"] == content:
                        if not has_json:
                            print(
                                "same content and no metadata for this script in tarball, no need to update"
                            )
                            continue
    
                        json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                        metadata = json.loads(json_content)
    
                        if (
                            old_script.get("summary") == metadata.get("summary")
                            and old_script.get("description") == metadata.get("description")
                            and old_script.get("is_template") == metadata.get("is_template")
                            and old_script.get("schema") == metadata.get("schema")
                            and old_script.get("lock") == metadata.get("lock")
                        ):
                            print("same content and no metadata, no need to update")
                            continue
    
                    parent_hash = old_script["hash"]
                    summary = old_script.get("summary")
                    description = old_script.get("description")
                    is_template = old_script.get("is_template")
                    schema = old_script.get("schema")
                    lock = None
    
                if has_json:
                    json_content = tar.extractfile(tar.getmember(json_tar_path)).read()
                    metadata = json.loads(json_content)
    
                    summary = metadata.get("summary")
                    description = metadata.get("description")
                    is_template = metadata.get("is_template")
                    schema = metadata.get("schema")
                    lock = metadata.get("lock")
                    if lock == []:
                        lock = None
    
                if schema:
                    schema = CreateScriptJsonBodySchema.from_dict(schema)
    
                print("Uploading new version of script at path: {}".format(path))
    
                if dry_run:
                    print("Skipped because dry-run")
                else:
                    r = create_script.sync_detailed(
                        client.workspace,
                        client=client.client,
                        json_body=CreateScriptJsonBody(
                            content=content,
                            path=path,
                            parent_hash=parent_hash,
                            summary=summary,
                            description=description,
                            is_template=is_template,
                            schema=schema,
                            lock=lock,
                        ),
                    )
                    print(r.content)
            if m.name.startswith(RESOURCES_PREFIX) and m.name.endswith(".json"):
                path = m.name[len(RESOURCES_PREFIX) : -len(".json")]
                print("Processing resource {}, {}".format(path, m.name))
                get_resource_response = get_resource.sync_detailed(
                    workspace=client.workspace, path=path, client=client.client
                )
                content = tar.extractfile(m).read().decode("utf-8")
                resource = json.loads(content)
                if get_resource_response.status_code == 200:
                    old_resource = json.loads(get_resource_response.content)
                    if resource["value"] != old_resource["value"]:
                        print("Updating existing resource")
                        r = update_resource.sync_detailed(
                            client.workspace,
                            path=path,
                            json_body=UpdateResourceJsonBody.from_dict(resource),
                            client=client.client,
                        )
                        print(r)
                    else:
                        print("Skipping updating identical resource")
                else:
                    print("Creating new resource")
                    r = create_resource.sync_detailed(
                        client.workspace,
                        json_body=CreateResourceJsonBody.from_dict(resource),
                        client=client.client,
                    )
                    print(r)
    
            if m.name.startswith(RESOURCE_TYPES_PREFIX) and m.name.endswith(".json"):
                path = m.name[len(RESOURCE_TYPES_PREFIX) : -len(".json")]
                print("Processing resource type {}, {}".format(path, m.name))
                get_resource_response = get_resource_type.sync_detailed(
                    workspace=client.workspace, path=path, client=client.client
                )
                content = tar.extractfile(m).read().decode("utf-8")
                resource = json.loads(content)
                print(resource)
                if get_resource_response.status_code == 200:
                    old_resource_type = json.loads(get_resource_response.content)
                    if resource["schema"] != old_resource_type["schema"]:
                        print("Updating existing resource type")
                        r = update_resource_type.sync_detailed(
                            client.workspace,
                            path=path,
                            json_body=UpdateResourceTypeJsonBody.from_dict(resource),
                            client=client.client,
                        )
                        print(r)
                    else:
                        print("Skipping updating identical resource type")
                else:
                    print("Creating new resource type")
                    r = create_resource_type.sync_detailed(
                        client.workspace,
                        json_body=CreateResourceTypeJsonBody.from_dict(resource),
                        client=client.client,
                    )
                    print(r)
    

    Submitted by admin 1327 days ago