1 | import requests |
2 | import json |
3 |
|
4 |
|
5 | class TokenParams: |
6 | def __init__(self, host, username, password, client_id, realm): |
7 | self.host = host |
8 | self.username = username |
9 | self.password = password |
10 | self.client_id = client_id |
11 | self.realm = realm |
12 |
|
13 |
|
14 | class TokenResponse: |
15 | def __init__(self, access_token=None, **kwargs): |
16 | self.access_token = access_token |
17 | self.__dict__.update(kwargs) |
18 |
|
19 |
|
20 | def get_access_token(params: TokenParams) -> str: |
21 | url = f"https://{params.host}/auth/realms/{params.realm}/protocol/openid-connect/token" |
22 |
|
23 | data = { |
24 | "grant_type": "password", |
25 | "username": params.username, |
26 | "password": params.password, |
27 | "client_id": params.client_id, |
28 | } |
29 |
|
30 | headers = { |
31 | "Accept": "application/json", |
32 | "Content-Type": "application/x-www-form-urlencoded", |
33 | "cache-control": "no-cache", |
34 | } |
35 |
|
36 | response = requests.post(url, data=data, headers=headers) |
37 |
|
38 | if response.status_code != 200: |
39 | raise Exception(f"HTTP error! status: {response.status_code}") |
40 |
|
41 | data = response.json() |
42 | token_response = TokenResponse(**data) |
43 | token = token_response.access_token |
44 | if not token: |
45 | raise Exception("Could not get access token") |
46 | return token |
47 |
|
48 |
|
49 | def get_email(responsible_id: str, token: str, api_host: str) -> str: |
50 | url = f"https://{api_host}/veo/persons/{responsible_id}" |
51 |
|
52 | headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} |
53 | response = requests.get(url, headers=headers) |
54 | if response.status_code != 200: |
55 | raise Exception( |
56 | f"HTTP error when fetching person object: {response.status_code}" |
57 | ) |
58 |
|
59 | person_data = response.json() |
60 | try: |
61 | email = ( |
62 | person_data.get("customAspects", {}) |
63 | .get("person_contactInformation", {}) |
64 | .get("attributes", {}) |
65 | .get("person_contactInformation_email") |
66 | ) |
67 | except KeyError: |
68 | raise Exception("Email field not found in person object.") |
69 |
|
70 | return email |
71 |
|
72 |
|
73 | def main( |
74 | control_implementations: list, |
75 | user: str, |
76 | passw: str, |
77 | oidc_client: str, |
78 | realm: str = "verinice-sandbox", |
79 | api_host: str = "api.sandbox.verinice.com", |
80 | ): |
81 | token = get_access_token( |
82 | TokenParams( |
83 | host="auth.verinice.com", |
84 | username=user, |
85 | password=passw, |
86 | client_id=oidc_client, |
87 | realm=realm, |
88 | ) |
89 | ) |
90 | print(json.dumps(control_implementations, indent=2)) |
91 |
|
92 | all_items = [] |
93 | for ci in control_implementations: |
94 | items = ci.get("items", []) |
95 | print(items) |
96 | for item in items: |
97 | print(item) |
98 | url = item.get("_requirementImplementations", "") |
99 | parent_control_id = item.get("control", {}).get("id") |
100 | print(url) |
101 | headers = { |
102 | "Authorization": f"Bearer {token}", |
103 | "Accept": "application/json", |
104 | } |
105 | response = requests.get(url, headers=headers) |
106 | if response.status_code != 200: |
107 | raise Exception(f"HTTP error! status: {response.status_code}") |
108 |
|
109 | json_data = response.json() |
110 | filtered_items = [] |
111 | for entry in json_data.get("items", []): |
112 | if "responsible" in entry: |
113 | filtered_item = { |
114 | "parent_control_id": parent_control_id, |
115 | "control_displayName": entry.get("control", {}).get( |
116 | "displayName" |
117 | ), |
118 | "control_id": entry.get("control", {}).get("id"), |
119 | "responsible_displayName": entry.get("responsible", {}).get( |
120 | "displayName" |
121 | ), |
122 | "responsible_id": entry.get("responsible", {}).get("id"), |
123 | "responsible_targetUri": entry.get("responsible", {}).get( |
124 | "targetUri" |
125 | ), |
126 | "origin_id": entry.get("origin", {}).get("id"), |
127 | "origin_displayName": entry.get("origin", {}).get( |
128 | "displayName" |
129 | ), |
130 | "origin_type": entry.get("origin", {}).get("type"), |
131 | "person_email": get_email( |
132 | entry.get("responsible", {}).get("id"), |
133 | token, |
134 | api_host |
135 | ), |
136 | } |
137 | filtered_items.append(filtered_item) |
138 | print(json.dumps(filtered_items, indent=2)) |
139 | all_items.extend(filtered_items) |
140 | return all_items |
141 |
|