AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.141.38.154
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/library/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/library/schedule.py
from anomaly import Anomaly
from datetime import datetime
from threshold import Threshold
import logging
import traceback
import sys
import time
try:
    import json
except ImportError:
    try:
        import simplejson as json
    # it's possible that we may not need json for the action that we're taking.
    # for example, for the rpm post install script, on a python version that
    # doesn't have json, we'll get this far in the code.  but the post
    # install doesn't use json, so we're fine
    except ImportError:
        json = None


# This represents a check that needs to be run, and all the associated
# processes on the check result that need to occur.
class Schedule(object):
    def __init__(self, schedule_data):
        self.log = logging.getLogger(self.__class__.__name__)
        self.update(schedule_data)

        self.last_check_value = None
        self.next_check_time = datetime.now()
        self.number_of_checks = 0

        self.cached_results = {}

    # TODO: These two methods are also in Threshold and Anomaly; we should make
    # these three classes inherit from an Entity class that implements these.
    # The logging library interferes with cPickle, so we must remove the logger
    # instance then reset it when we serialize/unserialize.
    def __getstate__(self):
        state = dict(self.__dict__)
        del state['log']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
        self.log = logging.getLogger(self.__class__.__name__)

    @classmethod
    def create_test_schedule(cls):
        # none of this stuff really matters for the individual plugin tests
        data = {
            "id": 1,
            "plugin_textkey": "",
            "resource_textkey": "",
            "option": "null",
            "frequency": 60,
            "thresholds": [{
                "id": 1,
                "delay": 0,
                "operator": "gt",
                "value": 0,
            }]
        }
        return cls(data)

    def update(self, schedule_data):
        self.id = schedule_data["id"]
        self.plugin_textkey = schedule_data["plugin_textkey"]
        self.resource_textkey = schedule_data["resource_textkey"]
        if type(schedule_data["option"]) == dict:
            self.option = schedule_data["option"]
        else:
            self.option = json.loads(schedule_data["option"] or 'null')
        self.frequency = schedule_data["frequency"]
        self.thresholds = []
        self.server_key = schedule_data.get("server_key", None)
        for threshold_data in schedule_data['thresholds']:
            try:
                self.thresholds.append(Threshold(**threshold_data))
            except:
                self.log.critical("Error storing threshold %s: %s" % (threshold_data, traceback.format_exc()))

    def __repr__(self):
        return "<Schedule %d, %s.%s, %d>" % (self.id,
                                             self.plugin_textkey,
                                             self.resource_textkey,
                                             self.number_of_checks)

    def check(self, plugin_manager, anomalies):
        value = plugin_manager.check(self)
        self.last_check_value = value
        self.number_of_checks += 1
        self.log.debug('Schedule %s, check #%d: %d', self.id, self.number_of_checks, value)
        self.detect_anomalies(value, anomalies)

        return value, anomalies

    def detect_anomalies(self, value, anomalies):
        if value is None:
            return

        for threshold in self.thresholds:
            self.log.info('Schedule %s checking value against threshold %s' % (self.id, threshold.id))
            if threshold.limit_exceeded(value):
                self.log.info('Schedule %s threshold %s - Value exceeds threshold' % (self.id, threshold.id))
                if threshold.id in anomalies:
                    anomaly = anomalies.get(threshold.id)
                    anomaly.time_last_detected = datetime.now()
                    anomaly.schedules_number_of_checks = self.number_of_checks
                else:
                    anomaly = Anomaly(threshold.duration, self.number_of_checks)
                anomalies.update({threshold.id: anomaly})
            else:
                # If we have an anomaly but it's waiting for duration, go ahead and remove it
                if threshold.id in anomalies and not anomalies[threshold.id].reported_as_exceeded_duration:
                    self.log.info('Schedule %s threshold %s - Value within threshold, removing pending anomaly' % (self.id, threshold.id))
                    del anomalies[threshold.id]

Anon7 - 2022
AnonSec Team