AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.137.200.58
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/plugins/nodejs.py
import agent_util
import re
try:
    import json
except:
    import simplejson as json
from agent_util import float

from os.path import isfile


class NodeJSPlugin(agent_util.Plugin):
    """
    NodeJS checking plugin for the
    panopta agent.
    """
    textkey = 'nodejs'
    label = 'NodeJS'

    _node = agent_util.which('nodejs') and 'nodejs' or 'node'

    @classmethod
    def get_metadata(self, config):
        if NodeJSPlugin._node_executable(config):
            status = agent_util.SUPPORTED
        else:
            status = agent_util.UNSUPPORTED
        if status == agent_util.UNSUPPORTED:
            msg = "Error finding a valid nodejs application."
            return {}
        else:
            msg = None
        found_keys = ['resident_set_size', 'heap_total', 'heap_used']
        metadata = {}
        for key in found_keys:
            unit = "MB"
            metadata[key] = {
                'label': key.replace('_', ' ').capitalize(),
                'options': None,
                'status': status,
                'msg': msg,
                'unit': unit
            }
        metadata['cluster_workers'] = {
            'label': 'Cluster workers',
            'options': None,
            'status': status,
            'msg': msg,
            'unit': 'worker count'
        }
        metadata['high_resolution_time'] = {
            'label': 'High resolution time',
            'options': None,
            'status': status,
            'msg': msg,
            'unit': 'seconds'
        }
        return metadata

    @staticmethod
    def _node_executable(config):
        """
        Run a simple command to make sure that the
        nodejs executable is available to the system.
        """
        custom_path = config.get("node_binary_location", None)
        if custom_path:
            return isfile(custom_path) and agent_util.which(custom_path)
        return agent_util.which(NodeJSPlugin._node)

    def _retrieve_heap_data(self):
        """
        Make a pass of retrieving the heap size of
        the V8.
        """
        heap_stats = "require('v8').getHeapStatistics()"
        return self._eval_node(heap_stats)

    def _eval_node(self, instruction):
        """
        Evaluate the passed instruction in node. All
        instructions are included with a console log
        statement to retrieve the passed information.
        """
        node_executable = NodeJSPlugin._node_executable(self.config)
        eval_command = """
        %s -p "%s"
        """ % (node_executable, instruction)
        result = agent_util.execute_command(eval_command)
        if result[0] == 0:
            return result[1]
        else:
            return 0

    def _retrieve_entry_from_data(self, data, value):
        """
        Retrieve a single value of the heap data
        returned by nodejs.
        """
        expr = r"%s: (\d+)" % (str(value))
        result = re.findall(expr, data)
        if result:
            return float(result[0])

    def _retrieve_workers_data(self):
        """
        Retrieve the result of gettings the workers.
        """
        instruction = "require('cluster').workers"
        worker_obj = self._eval_node(instruction)
        if not worker_obj:
            self.log.error("node returned unexpected output. Is the binary location correct?")
            return None
        expr = r'(\S+):'
        result = re.findall(expr, worker_obj)
        return len(result)

    def _retrieve_high_resolution_time(self):
        """
        Retrieve the high resolution time in
        seconds.
        """
        instruction = "process.hrtime()"
        result = self._eval_node(instruction)
        if not result:
            self.log.error("node returned unexpected output. Is the binary location correct?")
            return None
        expr = r'\[ (\d+),'
        seconds = re.findall(expr, result)
        if seconds:
            return float(seconds[0])
        else:
            return None

    def _retrieve_memory_data(self):
        """
        Retrieve the memory data of NodeJS process.
        """
        instruction = "process.memoryUsage()"
        result = self._eval_node(instruction)
        return result

    def _retrieve_resident_set_size(self):
        """
        Extract the resident set size from the process
        memoryUsage.
        """
        data = self._retrieve_memory_data()
        if not data:
            self.log.error("node returned unexpected output. Is the binary location correct?")
            return None
        value = self._find_value('rss', data)
        value = value and self._convert_bytes_to_mb(value) or None
        return value

    def _retrieve_heap_total(self):
        """
        Return the heap total.
        """
        data = self._retrieve_memory_data()
        if not data:
            self.log.error("node returned unexpected output. Is the binary location correct?")
            return None
        value = self._find_value('heapTotal', data)
        value = value and self._convert_bytes_to_mb(value) or None
        return value

    def _retrieve_heap_used(self):
        """
        Return the heap used.
        """
        data = self._retrieve_memory_data()
        if not data:
            self.log.error("node returned unexpected output. Is the binary location correct?")
            return None
        value = self._find_value('heapUsed', data)
        value = value and self._convert_bytes_to_mb(value) or None
        return value

    def _convert_bytes_to_mb(self, value):
        """
        Peform a quick conversion to mb.
        """
        return float(value) / (2**20)

    def _find_value(self, target, data):
        """
        Find the target in the passed data string
        as a javascript object.
        """
        expr = r'%s: (\d+)' % (str(target))
        result = re.findall(expr, data)
        return result and result[0] or None

    def check(self, textkey, data, config):
        self.config = config
        if textkey == 'cluster_workers':
            return self._retrieve_workers_data()
        elif textkey == 'high_resolution_time':
            return self._retrieve_high_resolution_time()
        elif textkey == 'resident_set_size':
            return self._retrieve_resident_set_size()
        elif textkey == 'heap_total':
            return self._retrieve_heap_total()
        else:
            return self._retrieve_heap_used()

Anon7 - 2022
AnonSec Team