1 | import requests |
2 | import json |
3 |
|
4 |
|
5 | class TokenParams: |
6 | def __init__(self, host, username, password, client_id, realm): |
7 | self.host = host |
8 | self.username = username |
9 | self.password = password |
10 | self.client_id = client_id |
11 | self.realm = realm |
12 |
|
13 |
|
14 | class TokenResponse: |
15 | def __init__(self, access_token=None, **kwargs): |
16 | self.access_token = access_token |
17 | self.__dict__.update(kwargs) |
18 |
|
19 |
|
20 | def get_access_token(params: TokenParams) -> str: |
21 | url = f"https://{params.host}/auth/realms/{params.realm}/protocol/openid-connect/token" |
22 |
|
23 | data = { |
24 | "grant_type": "password", |
25 | "username": params.username, |
26 | "password": params.password, |
27 | "client_id": params.client_id, |
28 | } |
29 |
|
30 | headers = { |
31 | "Accept": "application/json", |
32 | "Content-Type": "application/x-www-form-urlencoded", |
33 | "cache-control": "no-cache", |
34 | } |
35 |
|
36 | response = requests.post(url, data=data, headers=headers) |
37 |
|
38 | if response.status_code != 200: |
39 | raise Exception(f"HTTP error! status: {response.status_code}") |
40 |
|
41 | data = response.json() |
42 | token_response = TokenResponse(**data) |
43 | token = token_response.access_token |
44 | if not token: |
45 | raise Exception("Could not get access token") |
46 | return token |
47 |
|
48 |
|
49 | def main( |
50 | server_json: str, |
51 | existing_asset_json: str, |
52 | domain_id: str, |
53 | user="sandboxuser", |
54 | passw=None, |
55 | oidc_client="veo-sandbox", |
56 | realm="verinice-sandbox", |
57 | api_host="api.sandbox.verinice.com", |
58 | ): |
59 | token = get_access_token( |
60 | TokenParams( |
61 | host="auth.verinice.com", |
62 | username=user, |
63 | password=passw, |
64 | client_id=oidc_client, |
65 | realm=realm, |
66 | ) |
67 | ) |
68 | # print existing_asset_json formatted: |
69 | print(json.dumps(existing_asset_json, indent=2)) |
70 |
|
71 | # print server_json formatted: |
72 | print(json.dumps(server_json, indent=2)) |
73 |
|
74 | existing_asset_json = existing_asset_json.get("items", [])[0] |
75 | asset_id = existing_asset_json.get("id") |
76 |
|
77 | # Update fields in the existing asset JSON with new values |
78 | existing_asset_json["name"] = server_json["name"] |
79 | existing_asset_json["abbreviation"] = server_json["abbreviation"] |
80 | existing_asset_json["description"] = server_json["description"] |
81 |
|
82 | # Update custom aspects if they exist or create them if they don't |
83 | custom_aspects = existing_asset_json.get("customAspects", {}) |
84 |
|
85 | # asset_details update |
86 | asset_details = custom_aspects.get("asset_details", {}) |
87 | asset_details["asset_details_operatingStage"] = server_json[ |
88 | "asset_details_operatingStage" |
89 | ] |
90 | custom_aspects["asset_details"] = asset_details |
91 |
|
92 | # asset_interfaceDetails update |
93 | asset_interface_details = custom_aspects.get("asset_interfaceDetails", {}) |
94 | asset_interface_details["asset_interfaceDetails_networkAddresses"] = server_json[ |
95 | "asset_interfaceDetails_networkAddresses" |
96 | ] |
97 | custom_aspects["asset_interfaceDetails"] = asset_interface_details |
98 |
|
99 | existing_asset_json["customAspects"] = custom_aspects |
100 |
|
101 | # get ETag to prepare for PUT request |
102 | res = requests.get( |
103 | f"https://{api_host}/veo/domains/{domain_id}/assets/{asset_id}", |
104 | headers={ |
105 | "Content-Type": "application/json", |
106 | "Authorization": f"Bearer {token}", |
107 | }, |
108 | ) |
109 | etag = res.headers.get("ETag") |
110 |
|
111 | res = requests.put( |
112 | f"https://{api_host}/veo/domains/{domain_id}/assets/{asset_id}", |
113 | headers={ |
114 | "Content-Type": "application/json", |
115 | "Authorization": f"Bearer {token}", |
116 | "If-Match": etag, |
117 | }, |
118 | json=existing_asset_json, |
119 | ) |
120 | return res.json() |
121 | |