Server IP : 92.204.138.22 / Your IP : 18.191.162.222 Web Server : Apache System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.22.1.el8_10.x86_64 #1 SMP Tue Sep 24 05:16:59 EDT 2024 x86_64 User : internationaljou ( 1019) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/panopta-agent/library/ |
Upload File : |
from agent_util import total_seconds from datetime import datetime, timedelta import logging import sys class Anomaly(object): def __init__(self, acceptable_duration, schedules_number_of_checks): self.log = logging.getLogger(self.__class__.__name__) self.log.info('New anomaly detected') now = datetime.now() self.time_first_detected = now self.time_last_detected = now self.acceptable_duration = timedelta(seconds=acceptable_duration) self.reported_as_exceeded_duration = False self.reported_as_cleared = False self.schedules_number_of_checks = schedules_number_of_checks def __repr__(self): return '<%s, %s first detected: %s, last detected: %s, duration: %s>' % ( self.__class__.__name__, self.reported_as_exceeded_duration and "PUSHED" or "WAITING", self.time_first_detected, self.time_last_detected, self.acceptable_duration, ) # The logging library interferes with cPickle, so we must remove the logger # instance then reset it when we serialize/unserialize. def __getstate__(self): state = dict(self.__dict__) del state['log'] return state def __setstate__(self, state): self.__dict__.update(state) self.log = logging.getLogger(self.__class__.__name__) def exceeds_duration(self): time_since_first_detection = (datetime.now() - self.time_first_detected) self.log.debug('Anomaly began %s and has lasted %s seconds', self.time_first_detected, total_seconds(time_since_first_detection)) return time_since_first_detection >= self.acceptable_duration def has_cleared(self, current_check_number): time_since_last_detection = datetime.now() - self.time_last_detected self.log.debug('Anomaly was last detected at %s, %s seconds ago', self.time_last_detected, total_seconds(time_since_last_detection)) return (time_since_last_detection > self.acceptable_duration and self.schedules_number_of_checks < current_check_number)