-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessor.py
More file actions
31 lines (28 loc) · 1.1 KB
/
processor.py
File metadata and controls
31 lines (28 loc) · 1.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os, time, json, requests, redis
REDIS_URL = os.getenv('REDIS_URL','redis://localhost:6379/0')
r = redis.from_url(REDIS_URL)
MODEL_URL = os.getenv('MODEL_URL','http://localhost:8000/predict')
def extract_features(event):
vals = [float(event.get('amount',0))/1000.0, float(event.get('duration',0))/60.0, int(event.get('is_internal',0))]
arr = vals + [0]*(10-len(vals))
return arr[:10]
def main():
print('Processor listening on events list...')
while True:
item = r.blpop('events', timeout=5)
if not item:
time.sleep(0.5); continue
_, raw = item
event = json.loads(raw)
features = extract_features(event)
try:
resp = requests.post(MODEL_URL, json={'features': features}, timeout=5)
except Exception as e:
print('Model call error', e); continue
if resp.ok:
out = resp.json(); score = out.get('anomaly_score',0)
if score>0.05:
r.rpush('alerts', json.dumps({'score': score, 'event': event}))
print('ALERT', score)
if __name__=='__main__':
main()