import requests
import json
class TokenParams:
def __init__(self, host, username, password, client_id, realm):
self.host = host
self.username = username
self.password = password
self.client_id = client_id
self.realm = realm
class TokenResponse:
def __init__(self, access_token=None, **kwargs):
self.access_token = access_token
self.__dict__.update(kwargs)
def get_access_token(params: TokenParams) -> str:
url = f"https://{params.host}/auth/realms/{params.realm}/protocol/openid-connect/token"
data = {
"grant_type": "password",
"username": params.username,
"password": params.password,
"client_id": params.client_id,
}
headers = {
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"cache-control": "no-cache",
}
response = requests.post(url, data=data, headers=headers)
if response.status_code != 200:
raise Exception(f"HTTP error! status: {response.status_code}")
data = response.json()
token_response = TokenResponse(**data)
token = token_response.access_token
if not token:
raise Exception("Could not get access token")
return token
def get_email(responsible_id: str, token: str, api_host: str) -> str:
url = f"https://{api_host}/veo/persons/{responsible_id}"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception(
f"HTTP error when fetching person object: {response.status_code}"
)
person_data = response.json()
try:
email = (
person_data.get("customAspects", {})
.get("person_contactInformation", {})
.get("attributes", {})
.get("person_contactInformation_email")
)
except KeyError:
raise Exception("Email field not found in person object.")
return email
def main(
control_implementations: list,
user: str,
passw: str,
oidc_client: str,
realm: str = "verinice-sandbox",
api_host: str = "api.sandbox.verinice.com",
):
token = get_access_token(
TokenParams(
host="auth.verinice.com",
username=user,
password=passw,
client_id=oidc_client,
realm=realm,
)
)
print(json.dumps(control_implementations, indent=2))
all_items = []
for ci in control_implementations:
items = ci.get("items", [])
print(items)
for item in items:
print(item)
url = item.get("_requirementImplementations", "")
parent_control_id = item.get("control", {}).get("id")
print(url)
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception(f"HTTP error! status: {response.status_code}")
json_data = response.json()
filtered_items = []
for entry in json_data.get("items", []):
if "responsible" in entry:
filtered_item = {
"parent_control_id": parent_control_id,
"control_displayName": entry.get("control", {}).get(
"displayName"
),
"control_id": entry.get("control", {}).get("id"),
"responsible_displayName": entry.get("responsible", {}).get(
"displayName"
),
"responsible_id": entry.get("responsible", {}).get("id"),
"responsible_targetUri": entry.get("responsible", {}).get(
"targetUri"
),
"origin_id": entry.get("origin", {}).get("id"),
"origin_displayName": entry.get("origin", {}).get(
"displayName"
),
"origin_type": entry.get("origin", {}).get("type"),
"person_email": get_email(
entry.get("responsible", {}).get("id"),
token,
api_host
),
}
filtered_items.append(filtered_item)
print(json.dumps(filtered_items, indent=2))
all_items.extend(filtered_items)
return all_items
Submitted by alexander koderman64 451 days ago