AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 18.225.98.116
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/library/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/library/plugin_driver.py
import os
import sys
import time
import logging
import optparse
from plugin_manager import PluginManager
from schedule import Schedule
import container_discovery


class PluginDriver(object):
    def __init__(self, brand, pkg_dir, base_config_dir, base_custom_plugin_dir):
        self.config_file = os.path.join(base_config_dir, pkg_dir, "%s_agent.cfg" % brand)
        self.custom_plugin_dir = os.path.join(base_custom_plugin_dir, pkg_dir)

    def set_up_logging(self, verbose):
        LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(LOG_FORMAT))

        root_logger = logging.getLogger()
        root_logger.addHandler(handler)

        self.log = logging.getLogger(self.__class__.__name__)

        if verbose:
           root_logger.setLevel(logging.DEBUG)
        else:
           root_logger.setLevel(logging.WARN)
           self.log.setLevel(logging.INFO)

    def parse_arguments(self):
        parser = optparse.OptionParser()
        parser.add_option("-p", "--plugin-textkey", action="store", dest="plugin_textkey")
        parser.add_option("-r", "--resource-textkey", action="store", dest="resource_textkey")
        parser.add_option("-o", "--option", action="store", dest="option")
        parser.add_option("-d", "--plugin-directory", action="store", dest="plugin_directory", default=self.custom_plugin_dir, help="default %s" % self.custom_plugin_dir)
        parser.add_option("-c", "--config-file", action="store", dest="config_file", default=self.config_file, help="default %s" % self.config_file)
        parser.add_option("--container-id", action="store", dest="container_id", help="docker container id to check")
        parser.add_option("--count", action="store", dest="count", type="int", default=1, help="number of checks run, default 1")
        parser.add_option("--interval", action="store", dest="interval", type="int", default=1, help="interval between checks, default 1 second")
        parser.add_option("-v", "--verbose", action="store_true", dest="verbose")

        options, args = parser.parse_args()

        # validate required arguments
        if not options.plugin_textkey or not options.resource_textkey:
            print("Specify plugin-textkey and resource-textkey\n")
            parser.print_help()
            sys.exit(1)

        return options

    def get_container(self, container_id):
        can_access_docker = container_discovery.check_access()
        if can_access_docker != "success":
            self.log.error("cannot access to docker: %s" % can_access_docker)
            return None

        found_container = container_discovery.find_docker_container(container_id)
        if not found_container:
            self.log.error("container %s not found" % container_id)
            return None

        return found_container

    def main(self):
        # parse arguments
        options = self.parse_arguments()

        # set up logging
        self.set_up_logging(options.verbose)

        # check config file
        if not os.path.exists(options.config_file):
            self.log.warn("config file '%s' not exist" % options.config_file)

        # convert option to '"option"' format, so that schedule can do json.loads()
        if options.option:
            options.option = '"%s"' % options.option

        db = {}
        server_key = None

        # docker container
        if options.container_id:
            container = self.get_container(options.container_id)
            if not container:
                return

            db["docker_containers"] = {
                options.container_id: container
            }
            server_key = ":" + options.container_id

        # load plugins
        plugin_manager = PluginManager(db, options.config_file, options.plugin_directory)
        if options.plugin_textkey not in plugin_manager.plugins:
            self.log.error("Plugin %s not found" % options.plugin_textkey)
            return

        # create schedule to call check
        schedule = Schedule({
            "id": 1,
            "plugin_textkey": options.plugin_textkey,
            "resource_textkey": options.resource_textkey,
            "option": options.option,
            "server_key": server_key,
            "frequency": 60,
            "thresholds": []
        })

        for i in range(options.count):
            # run plugin
            value, _ = schedule.check(plugin_manager, {})

            self.log.info("%s.%s returned %s" % (schedule.plugin_textkey, schedule.resource_textkey, value))
            if schedule.cached_results:
                self.log.info("  cached_results: %s" % schedule.cached_results)

            # if this is not the last turn, sleep options.interval seconds
            if i < options.count - 1:
                time.sleep(options.interval)

Anon7 - 2022
AnonSec Team