AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.128.168.69
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/panopta-agent/library/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /usr/lib/panopta-agent/library/container_discovery.py
import logging
import os.path
import sys

from agent_util import UnixHTTPConnection, json_loads

logger = logging.getLogger('Containers')


def check_access():
    if not os.path.exists('/var/run/docker.sock'):
        return 'no-docker'

    try:
        conn = UnixHTTPConnection('/var/run/docker.sock')
        conn.request('GET', '/containers/json', headers={'Host': 'localhost'})
        conn.getresponse()
    except Exception:
        return 'no-permission'

    return 'success'


def discover_docker_containers(config, plugins, existing_containers, rebuild=False):
    conn = UnixHTTPConnection('/var/run/docker.sock')
    conn.request('GET', '/containers/json?all=true', headers={'Host': 'localhost'})
    r = conn.getresponse().read()
    j = json_loads(r)

    if not len(j):
        return []

    app_mapping = {}

    if config.has_section('docker_image_mapping'):
        items = config.items('docker_image_mapping')
        for app, image in items:
            app_mapping.setdefault(image, []).append(app)

    containers = []
    for container in j:
        short_id = container["Id"][:12]
        state = container["State"]

        existing_container = existing_containers.get(short_id)

        if not existing_container and state != "running":
            # Don't send stopped containers that we've never seen
            continue

        if existing_container and not rebuild:
            changed_state = existing_container["State"] != container["State"]
            existing_container.update(container)
            if changed_state:
                # Flag this container as updated
                existing_container["updated"] = True
            container = existing_container
        else:
            # Only fetch metadata for new containers
            container["container_platform"] = "docker"

            app_image = container["Image"]
            applications = []
            if app_image in app_mapping:
                applications.extend(app_mapping[app_image])
            container["applications"] = applications

            app_metadata = {}
            for app in ["docker"] + applications:
                try:
                    plugin = plugins.plugins.get(app)
                    if not plugin:
                        logger.error("Unknown/unloaded docker plugin '%s'", app)
                        continue
                    metadata = plugin.get_metadata_docker(container, plugins.config.get(app, {})) or {}
                    logger.info('Container %s: %d textkeys for %s', short_id, len(metadata.keys()), app)
                    plugin_label = getattr(plugin, 'label', plugin.textkey)
                    app_metadata[app] = (plugin_label, metadata)
                except Exception:
                    t, e = sys.exc_info()[:2]
                    logging.exception(e)
                    continue
            container["app_metadata"] = app_metadata

        containers.append(container)

    return containers

def find_docker_container(container_id):
    conn = UnixHTTPConnection('/var/run/docker.sock')
    conn.request('GET', '/containers/json?all=true', headers={'Host': 'localhost'})
    r = conn.getresponse().read()
    container_list = json_loads(r)

    for container in container_list:
        short_id = container["Id"][:12]
        if short_id == container_id:
            return container

    return None

Anon7 - 2022
AnonSec Team