AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.21.106.240
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/plugins/apache_kafka.py
import agent_util
import logging
import traceback

logger = logging.getLogger(__name__)

### Mapping of JMX URI entries to their agent readable counterparts
JMX_MAPPING = {
    "broker.bips.oneminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec", "OneMinuteRate", None),
    "broker.bips.fiveminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec", "FiveMinuteRate", None),
    "broker.bips.fifteenminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec", "FifteenMinuteRate", None),
    "broker.bips.meanrate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec", "MeanRate", None),
    "broker.bops.oneminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec", "OneMinuteRate", None),
    "broker.bops.fiveminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec", "FiveMinuteRate", None),
    "broker.bops.fifteenminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec", "FifteenMinuteRate", None),
    "broker.bops.meanrate" : ("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec", "MeanRate", None),
    "broker.mips.oneminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", "OneMinuteRate", None),
    "broker.mips.fiveminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", "FiveMinuteRate", None),
    "broker.mips.fifteenminuterate" : ("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", "FifteenMinuteRate", None),
    "broker.mips.meanrate" : ("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", "MeanRate", None),
    "underreplicatedpartitions" : ("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions", "Value", None),
    "fetch.queue-size" : ("kafka.server:type=Fetch", "queue-size", None),
    "memory.heap.committed" : ("java.lang:type=Memory", "HeapMemoryUsage", "committed"),
    "memory.heap.used" : ("java.lang:type=Memory", "HeapMemoryUsage", "used"),
    "memory.heap.max" : ("java.lang:type=Memory", "HeapMemoryUsage", "max"),
}
####

def discover_beans(connection):
    allowed_beans = ['kafka.server:type=BrokerTopicMetrics', 'kafka.server:type=ReplicaManager', 
        'kafka.log:type=LogFlushStats', 'java.lang:type=Memory', 'kafka.server:type=Fetch']
    ignored_topics = ['__consumer_offsets', 'ReplicaFetcherThread-0-2']

    discovered_beans = []
    discovered_topics = []
    avail_beans = connection.queryMBeans(None, None)
    for bean in avail_beans:
        name = bean.objectName.toString()
        if any(b in name for b in allowed_beans):
            discovered_beans.append(bean.toString())
            if 'topic' in name:
                topic = name.split(',')[2].split('=')[1]
                if topic not in ignored_topics:
                    discovered_topics.append(topic)

    return discovered_beans, list(set(discovered_topics))


class ApacheKafkaPlugin(agent_util.Plugin):
    textkey = "apache_kafka_jmx"
    label = "Apache Kafka (JMX)"
    connection = None

    @classmethod
    def get_metadata(self, config):
        status = agent_util.SUPPORTED
        msg = None

        # Check for jmx configuration block
        if not config:
            self.log.info("No JMX configuration found")
            return {}

        # make sure jpype1 is installed first
        try:
            import jpype
            from jpype import java, javax
        except:
            msg = "Unable to access JMX metrics due to missing jpype library."
            self.log.info(msg)
            status = agent_util.MISCONFIGURED

        # Check for config setting sin jmx configuration block
        for key in ['port', 'host']:
            if key not in config:
                msg = "Missing value for %s in the [jmx] block of the agent config file." % key
                self.log.info(msg)
                status = agent_util.MISCONFIGURED

        # we'll need to get the default JVM path if not specified. If that doesn't work, throw an error
        if 'jvm_path' not in config:
            try:
                jvm_path = jpype.getDefaultJVMPath()
                if not jvm_path:
                    msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
                    self.log.info(msg)
                    status = agent_util.MISCONFIGURED
            except:                
                msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
                self.log.info(msg)
                self.log.error(traceback.format_exc())
                status = agent_util.MISCONFIGURED
        elif 'jvm_path' in config:
            jvm_path = config['jvm_path']


        try:
            if status == agent_util.SUPPORTED and not jpype.isJVMStarted():
                jpype.startJVM(jvm_path)
        except:
            msg = "Unable to access JMX metrics because JVM cannot be started."
            self.log.info(msg)
            status = agent_util.MISCONFIGURED

        if status == agent_util.SUPPORTED:
            try:
                if not jpype.isJVMStarted(): jpype.startJVM(config['jvm_path'])
                jhash = java.util.HashMap()
                if config.get('username') and config.get('password'):
                    jarray = jpype.JArray(java.lang.String)([config['username'], config['password']])
                    jhash.put(javax.management.remote.JMXConnector.CREDENTIALS, jarray)
                url = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi" % (config['host'], int(config['port']))
                jmxurl = javax.management.remote.JMXServiceURL(url)
                # Gather the topics we can monitor
                jmxsoc = javax.management.remote.JMXConnectorFactory.connect(jmxurl, jhash)
                self.connection = jmxsoc.getMBeanServerConnection()
                
            except:
                msg = "Unable to access JMX metrics, JMX is not running or not installed."
                self.log.info(msg)
                status = agent_util.MISCONFIGURED
                return {}

        beans, topics = discover_beans(self.connection)

        metadata = {
            "broker.bips.oneminuterate" : {
                "label": "Topic Byes In/sec - 1 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.bips.fiveminuterate" : {
                "label": "Topic Byes In/sec - 5 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.bips.fifteenminuterate" : {
                "label": "Topic Byes In/sec - 15 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.bips.meanrate" : {
                "label": "Topic Byes In/sec - Avg",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.bops.oneminuterate" : {
                "label": "Topic Byes Out/sec - 1 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.bops.fiveminuterate" : {
                "label": "Topic Byes Out/sec - 5 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.bops.fifteenminuterate" : {
                "label": "Topic Byes Out/sec - 15 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.bops.meanrate" : {
                "label": "Topic Byes Out/sec - Avg",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "broker.mips.oneminuterate" : {
                "label": "Topic Messages In/sec - 1 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "messages"
            },

            "broker.mips.fiveminuterate" : {
                "label": "Topic Messages In/sec - 5 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "messages"
            },

            "broker.mips.fifteenminuterate" : {
                "label": "Topic Messages In/sec - 15 min",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "messages"
            },

            "broker.mips.meanrate" : {
                "label": "Topic Messages In/sec - Avg",
                "options": topics,
                "status": status,
                "error_message": msg,
                "unit": "messages"
            },

            "underreplicatedpartitions" : {
                "label": "Replica Manager Unreplicated Partitions",
                "options": None,
                "status": status,
                "error_message": msg,
                "unit": "partitions"
            },
            
            "fetch.queue-size" : {
                "label": "Queued messages",
                "options": None,
                "status": status,
                "error_message": msg,
            },

            "memory.heap.committed" : {
                "label": "Heap Memory - Committed",
                "options": None,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "memory.heap.used" : {
                "label": "Heap Memory - Used",
                "options": None,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

            "memory.heap.max" : {
                "label": "Heap Memory - Max",
                "options": None,
                "status": status,
                "error_message": msg,
                "unit": "bytes"
            },

        }
        return metadata

    def check(self, textkey, data, config):
        try:
            import jpype
            from jpype import java, javax
        except:
            self.log.error("Unable to import jpype! Is it installed?")
            return None

        try:
           # we'll need to get the default JVM path if not specified. If that doesn't work, throw an error
            if 'jvm_path' not in config:
                try:
                    jvm_path = jpype.getDefaultJVMPath()
                    if not jvm_path:
                        msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
                        self.log.info(msg)
                except:                
                    msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
                    self.log.info(msg)
                    self.log.error(traceback.format_exc())
            elif 'jvm_path' in config:
                jvm_path = config['jvm_path']

            if not jpype.isJVMStarted(): 
                jpype.startJVM(jvm_path)
            jhash = java.util.HashMap()
            if config.get('username') and config.get('password'):
                jarray = jpype.JArray(java.lang.String)([config['username'], config['password']])
                jhash.put(javax.management.remote.JMXConnector.CREDENTIALS, jarray)
            url = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi" % (config['host'], int(config['port']))
            jmxurl = javax.management.remote.JMXServiceURL(url)
            jmxsoc = javax.management.remote.JMXConnectorFactory.connect(jmxurl, jhash)
            connection = jmxsoc.getMBeanServerConnection()

            parts = JMX_MAPPING.get(textkey, None)
            if parts is None or not parts:
                self.log.error("Unable to find Kafka metric %s in known metrics!" % textkey)
                return None

            # start building the JMX object
            obj = parts[0]
            if data:
                obj +=',topic=%s' % data
            # get the actual metric
            attribute = parts[1]
            # if the metric is buried deeper in a dict, grab it
            val = parts[2]

            res = connection.getAttribute(javax.management.ObjectName(obj), attribute)
            log_msg = "Checking Kafka metric %s" % attribute

            if val is not None:
                return res.contents.get(val).floatValue()
            else:
                log_msg += " with key %s" % val
                return res.floatValue()

            self.log.debug(log_msg)
        except:
            self.log.critical("Error checking Kafka metric %s - %s \n%s" % (textkey, data, traceback.format_exc()))
            return None

Anon7 - 2022
AnonSec Team