11from time import time
22from typing import Tuple , Union
3- from uuid import uuid4
4-
5- import structlog
63
74from publisher .drivers .event_source import EventSource
8- from publisher .drivers .file_source import FileSource
9- from publisher .entities .events import Event
10- from publisher .entities .guardrail import (
5+ from publisher .entities .events import (
6+ Event ,
117 GuardrailActivated ,
12- GuardrailActivatedDetail ,
138 GuardrailPassed ,
14- GuardrailPassedDetail ,
159)
1610
17- LOGGER = structlog .get_logger (__name__ )
18-
1911
20- class Checkov (EventSource ):
21- def __init__ (self ) -> None :
22- self .file_source = FileSource ()
23-
24- def get_events (self , file : str ) -> Union [Exception , Tuple [Event ]]:
12+ class Checkov (EventSource ):
13+ def get_events (self , file_data : dict , repo_name : str ) -> Union [Exception , Tuple [Event ]]:
2514 current_time = int (time ())
26- data = self .file_source .read_file (file )
27- if isinstance (data , Exception ):
28- LOGGER .error (f"Unable to read Checkov results file: { file } " , exception = str (data ))
29- return data
3015 events = []
31- if "results" in data :
32- for result in data ["results" ]["passed_checks" ]:
33- events .append (GuardrailPassed (
34- source = "contino.custom" ,
35- detail_type = "Checkov Guardrail Passed" ,
36- detail = GuardrailPassedDetail (
37- aggregate_id = result ["resource" ], # Will need a better way of getting resource id, current method is not live id
38- guardrail_id = result ["check_id" ],
39- time = current_time ,
40- )
41- ))
42- for result in data ["results" ]["failed_checks" ]:
16+ if "results" in file_data :
17+ for result in file_data ["results" ]["passed_checks" ]:
18+ events .append (GuardrailPassed (
19+ aggregate_id = repo_name + "." + result ["resource" ],
20+ guardrail_id = result ["check_id" ],
21+ time = current_time ,
22+ ))
23+ for result in file_data ["results" ]["failed_checks" ]:
4324 events .append (GuardrailActivated (
44- source = "contino.custom" ,
45- detail_type = "Checkov Guardrail Activated" ,
46- detail = GuardrailActivatedDetail (
47- aggregate_id = result ["resource" ], # Will need a better way of getting resource id, current method is not live id
48- guardrail_id = result ["check_id" ],
49- time = current_time ,
50- )
25+ aggregate_id = repo_name + "." + result ["resource" ],
26+ guardrail_id = result ["check_id" ],
27+ time = current_time ,
5128 ))
5229 return tuple (events )
53- LOGGER .error (f"Unable to read Checkov results from file: { file } " )
54- return Exception (f"Unable to read Checkov results from file: { file } " )
30+ return Exception (f"Unable to read Checkov results from file" )
5531
0 commit comments