Server IP : 92.204.138.22 / Your IP : 18.222.164.162 Web Server : Apache System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64 User : internationaljou ( 1019) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /proc/self/root/usr/lib/panopta-agent/countermeasures/plugins/ |
Upload File : |
""" Panopta Countermeasures plugin base class Copyright 2017, Panopta LLC admin@panopta.com """ from datetime import datetime import itertools import logging import os import agent_util class CountermeasurePlugin: name = "Base Countermeasure" textkey = "base" description = "" wall_announce_delay = None max_frequency = None max_runtime = None sudo_requirements = [] author = None def __init__(self): self.output = [] self.return_code = None self.log = logging.getLogger("countermeasure") self.metadata = {} def set_metadata(self, metadata): self.metadata = metadata def execute(self, cmd, timeout=None, block=True): """ Execute a command, optionally with a timeout (in seconds) after which point it is killed off. Returns a tuple of (returncode, output). """ existing_path = os.environ.get('PATH') if '/usr/sbin' not in existing_path: existing_path += ':/usr/sbin' env = {'PATH': existing_path} else: env = None ret = agent_util.execute_command(cmd, timeout=timeout or self.max_runtime, block=block, env=env) if ret is None: return (None, None) return (ret[0], ret[1]) def which(self, program): """ Determine if a given program is available and exexcutable. If found, return the program name """ return agent_util.which(program) def validate(self): """ Optional method to perform validation on the plugin's setup. This is called by the command-line tool's "validate-plugins" command. Mainly used by helper subclasses that intend to have some additional properties overridden. Should return nothing if the plugin is valid, or a string describing validation issues if there are problems. """ pass def prepare(self): """ Optional method to be run before execution, for any initial setup or validation that the countermeasure action needs to perform. """ pass def run(self): """ Execute the countermeasure action """ raise NotImplemented def save_text_output(self, output): """ Save countermeasure output as plain text for later publishing up to the Panopta cloud """ self.output.append({"timestamp": datetime.utcnow().strftime( "%Y-%m-%d %H:%M:%S"), "format": "text", "output": output}) def save_html_output(self, output): """ Save countermeasure output as formatted HTML for later publishing up to the Panopta cloud """ self.output.append({"timestamp": datetime.utcnow().strftime( "%Y-%m-%d %H:%M:%S"), "format": "html", "output": output}) def save_return_code(self, return_code): """ Save the return code from the countermeasure execution """ self.return_code = return_code class JsonPlugin(CountermeasurePlugin): def __init__(self, command): self.output = [] self.return_code = None self.log = logging.getLogger("countermeasure") self.metadata = {} self.command = command def run(self): max_runtime = 45 if self.max_runtime: max_runtime = self.max_runtime return_code, output = self.execute(self.command, timeout=max_runtime) self.save_text_output(output) self.save_return_code(return_code)