1 | import requests |
2 | import json |
3 |
|
4 |
|
5 | class TokenParams: |
6 | def __init__(self, host, username, password, client_id, realm): |
7 | self.host = host |
8 | self.username = username |
9 | self.password = password |
10 | self.client_id = client_id |
11 | self.realm = realm |
12 |
|
13 |
|
14 | class TokenResponse: |
15 | def __init__(self, access_token=None, **kwargs): |
16 | self.access_token = access_token |
17 | self.__dict__.update(kwargs) |
18 |
|
19 |
|
20 | def get_access_token(params: TokenParams) -> str: |
21 | url = f"https://{params.host}/auth/realms/{params.realm}/protocol/openid-connect/token" |
22 |
|
23 | data = { |
24 | "grant_type": "password", |
25 | "username": params.username, |
26 | "password": params.password, |
27 | "client_id": params.client_id, |
28 | } |
29 |
|
30 | headers = { |
31 | "Accept": "application/json", |
32 | "Content-Type": "application/x-www-form-urlencoded", |
33 | "cache-control": "no-cache", |
34 | } |
35 |
|
36 | response = requests.post(url, data=data, headers=headers) |
37 |
|
38 | if response.status_code != 200: |
39 | raise Exception(f"HTTP error! status: {response.status_code}") |
40 |
|
41 | data = response.json() |
42 | token_response = TokenResponse(**data) |
43 | token = token_response.access_token |
44 | if not token: |
45 | raise Exception("Could not get access token") |
46 | return token |
47 |
|
48 |
|
49 | def main( |
50 | incident_json: str, |
51 | unit_id: str, |
52 | domain_id: str, |
53 | user="sandboxuser", |
54 | passw=None, |
55 | oidc_client="veo-sandbox", |
56 | oidc_realm="verinice-sandbox", |
57 | api_host="api.sandbox.verinice.com" |
58 | ): |
59 | token = get_access_token( |
60 | TokenParams( |
61 | host="auth.verinice.com", |
62 | username=user, |
63 | password=passw, |
64 | client_id=oidc_client, |
65 | realm=oidc_realm |
66 | ) |
67 | ) |
68 |
|
69 | request_body = { |
70 | "name": incident_json["Kurzbeschreibung des Vorfalls"], |
71 | "abbreviation": incident_json["abbreviation"], |
72 | "description": incident_json["description"], |
73 | "owner": { |
74 | # Update the targetUri as needed. |
75 | "targetUri": f"https://{api_host}/veo/units/{unit_id}" |
76 | }, |
77 | "subType": "INC_SecurityIncident", |
78 | "status": "NEW", |
79 | "customAspects": { |
80 | "incident_security": incident_json["incident_security"], |
81 | "incident_description": incident_json["incident_description"], |
82 | "incident_generalInformation": { |
83 | "incident_generalInformation_document": incident_json["document"] |
84 | } |
85 | }, |
86 | } |
87 |
|
88 | res = requests.post( |
89 | f"https://{api_host}/veo/domains/{domain_id}/incidents", |
90 | headers={ |
91 | "Content-Type": "application/json", |
92 | "Authorization": f"Bearer {token}", |
93 | }, |
94 | json=request_body, |
95 | ) |
96 | return res.json() |
97 |
|