import requests
import json
class TokenParams:
def __init__(self, host, username, password, client_id, realm):
self.host = host
self.username = username
self.password = password
self.client_id = client_id
self.realm = realm
class TokenResponse:
def __init__(self, access_token=None, **kwargs):
self.access_token = access_token
self.__dict__.update(kwargs)
def get_access_token(params: TokenParams) -> str:
url = f"https://{params.host}/auth/realms/{params.realm}/protocol/openid-connect/token"
data = {
"grant_type": "password",
"username": params.username,
"password": params.password,
"client_id": params.client_id,
}
headers = {
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"cache-control": "no-cache",
}
response = requests.post(url, data=data, headers=headers)
if response.status_code != 200:
raise Exception(f"HTTP error! status: {response.status_code}")
data = response.json()
token_response = TokenResponse(**data)
token = token_response.access_token
if not token:
raise Exception("Could not get access token")
return token
def main(
server_json: str,
existing_asset_json: str,
domain_id: str,
user="sandboxuser",
passw=None,
oidc_client="veo-sandbox",
realm="verinice-sandbox",
api_host="api.sandbox.verinice.com",
):
token = get_access_token(
TokenParams(
host="auth.verinice.com",
username=user,
password=passw,
client_id=oidc_client,
realm=realm,
)
)
# print existing_asset_json formatted:
print(json.dumps(existing_asset_json, indent=2))
# print server_json formatted:
print(json.dumps(server_json, indent=2))
existing_asset_json = existing_asset_json.get("items", [])[0]
asset_id = existing_asset_json.get("id")
# Update fields in the existing asset JSON with new values
existing_asset_json["name"] = server_json["name"]
existing_asset_json["abbreviation"] = server_json["abbreviation"]
existing_asset_json["description"] = server_json["description"]
# Update custom aspects if they exist or create them if they don't
custom_aspects = existing_asset_json.get("customAspects", {})
# asset_details update
asset_details = custom_aspects.get("asset_details", {})
asset_details["asset_details_operatingStage"] = server_json[
"asset_details_operatingStage"
]
custom_aspects["asset_details"] = asset_details
# asset_interfaceDetails update
asset_interface_details = custom_aspects.get("asset_interfaceDetails", {})
asset_interface_details["asset_interfaceDetails_networkAddresses"] = server_json[
"asset_interfaceDetails_networkAddresses"
]
custom_aspects["asset_interfaceDetails"] = asset_interface_details
existing_asset_json["customAspects"] = custom_aspects
# get ETag to prepare for PUT request
res = requests.get(
f"https://{api_host}/veo/domains/{domain_id}/assets/{asset_id}",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
etag = res.headers.get("ETag")
res = requests.put(
f"https://{api_host}/veo/domains/{domain_id}/assets/{asset_id}",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"If-Match": etag,
},
json=existing_asset_json,
)
return res.json()
Submitted by alexander koderman64 451 days ago