Server IP : 92.204.138.22 / Your IP : 18.119.172.85 Web Server : Apache System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64 User : internationaljou ( 1019) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /proc/self/root/var/opt/nydus/ops/primordial/service/ |
Upload File : |
# -*- coding: utf-8 -*- from argparse import ArgumentParser from datetime import datetime from collections import defaultdict from functools import partial import json import logging import os import os.path import platform import signal import socket import subprocess from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union # pylint: disable=W0611 from uuid import uuid1 from kazoo.client import KazooClient, KazooState from primordial.constants import CANONICAL_TIMESTRING_FORMAT from primordial.flow import retry, RetryError from primordial.settings import (ZK_HOSTS, GD_ZKRUN_VIRTUALENV_PATH, GD_ZKRUN_COMMAND, GD_ZKRUN_NAME, GD_ZKRUN_HOSTS, GD_ZKRUN_SERVICE_TYPE, GD_ZKRUN_PORT, GD_ZKRUN_SSL_PORT, GD_ZKRUN_LOCATIONS) SIGNALS = defaultdict(int) # type: Dict[int, int] # map pid to running ZKSTART_STATUS = {} LOGGER = logging.getLogger(__name__) def get_zkhosts(): """Gets the zkhosts from the default settings""" return ZK_HOSTS LocationsType = Optional[Union[str, Iterable[str]]] class ZkRegistrar: """Implementation of a basic Zookeeper ephemeral node registrar.""" locations = None # type: LocationsType def __init__(self, listener: Optional[Callable] = None, name: Optional[str] = None, zkhosts: Optional[str] = None, service_type: str = "registrations", port: Optional[int] = None, ssl_port: Optional[int] = None, locations: LocationsType = None) -> None: """Create a zookeeper client and hang on to the listener. :param listener: The zookeeper listener action; is activated by the kazoo client when updates occur to watched zookeeper parameters. :param name: The path base for this service. :param zkhosts: The zkhost or hosts (list) to use with the zk client; if none set, a local function is used that uses hfs best practices to find the zk host setting for the machine this is running on. :param service_type: verticals for REST interfaces and registrations for Kafka listeners, pollers, etc. :param port: port the service is listening on if any :param ssl_port: ssl port if any :param locations: URLs that correspond to this server if any """ if zkhosts is None: zkhosts = get_zkhosts() if isinstance(zkhosts, (list, tuple)): zkhosts = ",".join(zkhosts) self.listener = listener or self.default_zk_state_handler self.name = name self.zk = KazooClient(hosts=zkhosts) self.zk_uuid = str(uuid1()) self.service_type = service_type self.port = port self.ssl_port = ssl_port self.locations = locations self.connection_was_lost = False self.need_to_register = True def default_zk_state_handler(self, state: str) -> None: """A default state handler function""" LOGGER.debug('Zookeeper state change: %s (%s)', state, self.zk.client_state) # The state transition we need to handle is LOST->CONNECTED, in which # case we need to re-register the service in ZooKeeper. if state == KazooState.LOST: self.connection_was_lost = True self.need_to_register = False elif state == KazooState.CONNECTED and self.connection_was_lost: self.connection_was_lost = False self.need_to_register = True def register(self) -> None: """Register the listener""" self.zk.add_listener(self.listener) self.zk.start() if self.locations is None: locations = [] # type: Iterable[str] elif not isinstance(self.locations, (list, tuple)): locations = [self.locations] # type: ignore else: locations = self.locations zk_register_data = { 'address': platform.node(), 'id': self.zk_uuid, 'name': self.name, 'locations': locations, 'registrationTimeUTC': datetime.utcnow().strftime(CANONICAL_TIMESTRING_FORMAT), # pylint: disable=E1120 'serviceType': 'DYNAMIC' } # type: Dict[str, Any] if self.port is not None: zk_register_data['port'] = self.port if self.ssl_port is not None: zk_register_data['sslPort'] = self.ssl_port self.zk.ensure_path('/service/%s/%s' % (self.service_type, self.name)) self.zk.create('/service/%s/%s/%s' % (self.service_type, self.name, self.zk_uuid), json.dumps(zk_register_data).encode('utf-8'), ephemeral=True) self.need_to_register = False def unregister(self) -> None: """Unregister the listener and stop""" self.zk.delete('/service/%s/%s/%s' % (self.service_type, self.name, self.zk_uuid)) self.zk.remove_listener(self.listener) self.zk.stop() def shell_run(command: str) -> subprocess.Popen: """A hyper simplistic shell command runner. Linux only. Uses bash as /bin/bash. NB: this does NOT change the current working directory of the shell or the parent process; any command you run will have to be on the PATH or part of the entered virtualenv, etc. :param command: This is a command that will be run in a subshell, verbatim. The caller is fully responsible for all related security concerns. This function will not perform any processing on the string, e.g.no string substitutions, no interpolations, etc. It will simply create a shell, execute the command on a single input to the shell, and then manage the process. Shell metacharacters and expansions ** will ** be processed as per standard shell semantics. E.g. .. code-block:: python uwsgi_exe = ("source %(virtualenv)s/bin/activate && " "cd %(virtualenv)s/.. && " "uwsgi --master --lazy-apps --processes 5 " "--die-on-term --module wsgi:application --http :80" % {'virtualenv': vpath}) :returns: The structure returned by subprocess.Popen: https://docs.python.org/3/library/subprocess.html#subprocess.Popen """ return subprocess.Popen(["/bin/bash", "-c", command]) # pylint: disable=consider-using-with def enter_virtualenv_and_run(virtualenv_path: str, command: str) -> subprocess.Popen: """Enter a virtual environment and run a command. Simplifies a common python use case of entering a virtualenv and running a local shell command. NB: this does NOT change the current working directory of the shell or the parent process; any command you run will have to be on the PATH or part of the entered virtualenv. :param virtualenv_path: A string representing the absolute path to a virtualenv. :param command: As per shell_run. :returns: The structure returned by subprocess.Popen: https://docs.python.org/3/library/subprocess.html#subprocess.Popen :raises ValueError: if the virtualenv path appears to be in error. """ vpath = os.path.abspath(virtualenv_path) if not os.path.exists(os.path.join(vpath, 'bin', 'activate')): raise ValueError('Specified path (%s) does not look like a python virtualenv!' % virtualenv_path) venv_command = ("source %(virtualenv_path)s/bin/activate && %(command)s" % {'virtualenv_path': virtualenv_path, 'command': command}) return shell_run(venv_command) def bind_signal_handlers(signals: List[int], signal_handler_func: Callable) -> None: """Bind one or more signals to a signal handler. :param signals: A list of signals to be bound :param signal_handler_func: The handler to bind to the signals :raises ValueError: on invalid signal number """ for signal_ in signals: signal.signal(signal_, signal_handler_func) def default_signal_handler(signum: int, _) -> None: """By default, count calls to a signal. :param signum: The signal which was called """ SIGNALS[signum] += 1 def set_default_signals() -> None: """Bind default signal handlers.""" bind_signal_handlers([signal.SIGINT, signal.SIGTERM], default_signal_handler) def wait_for_process(pid: int, sleep_secs: float, max_attempts: int) -> Tuple[int, int]: """Wait for a process to finish running. :param pid: The process ID on which to wait :param sleep_secs: The number of seconds to sleep between checks :param max_attempts: The number of times to attempt checking on the pid """ def wait_for_pid(mypid): w_process_id, w_exit_status = os.waitpid(mypid, os.WNOHANG) if w_process_id == 0: raise Exception("Not done yet") return w_process_id, w_exit_status try: process_id, exit_status = retry(partial(wait_for_pid, pid), sleep_secs=sleep_secs, max_attempts=max_attempts) except RetryError as _: process_id, exit_status = None, None return process_id, exit_status def check_tcp_port(port: int) -> bool: """Check to see if a local port is accepting TCP connections. :param port: The port number to be checked. """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) s.settimeout(1) ret = False try: s.connect(('', port)) s.shutdown(socket.SHUT_RDWR) ret = True except socket.timeout: pass finally: s.close() return ret def zk_start(args: Optional[List[str]] = None) -> ZkRegistrar: """Start a process wrapped in a watcher and register with zookeeper. Steps: 1. Get the ephemeral registration data from command-line arguments or environment variables 2. Get the command to run from command-line arguments or environment variables 3. Run the command. 4. If successful, register with zookeeper. 5. Wait for the command to complete. 6. De-register with zookeeper. Command-line arguments: * --virtualenv-path * --command * --name * --zkhosts * --service_type (verticals, registrations) * --port * --ssl-port * --locations Environment variables (from settings.py): * GD_ZKRUN_VIRTUALENV_PATH * GD_ZKRUN_COMMAND * GD_ZKRUN_NAME * ZK_HOSTS * GD_ZKRUN_HOSTS * GD_ZKRUN_SERVICE_TYPE * GD_ZKRUN_PORT * GD_ZKRUN_SSL_PORT * GD_ZKRUN_LOCATIONS Methodology: * Command-line arguments override environment variables. """ parser = ArgumentParser(description="Start a process with a zookeeper registration") parser.add_argument('--virtualenv-path', help="Path to a python venv") parser.add_argument('--command', required=True, help="Shell command to run, properly escaped") parser.add_argument('--name', required=True, help="Service name (zk base node)") parser.add_argument('--zkhosts', action='append', help="Zookeeper hosts") parser.add_argument('--service_type', required=True, help="Service type, registrations or verticals") parser.add_argument('--port', type=int, help="Port of service if any") parser.add_argument('--ssl-port', type=int, help="Ssl port of service if any") parser.add_argument('--locations', action='append', help="Urls for MCP use if any") res = parser.parse_args(args=args) virtualenv_path = res.virtualenv_path or GD_ZKRUN_VIRTUALENV_PATH or None command = res.command or GD_ZKRUN_COMMAND or None name = res.name or GD_ZKRUN_NAME or None zkhosts = res.zkhosts or GD_ZKRUN_HOSTS or ZK_HOSTS or None service_type = res.service_type or GD_ZKRUN_SERVICE_TYPE or None port = res.port or GD_ZKRUN_PORT or None ssl_port = res.ssl_port or GD_ZKRUN_SSL_PORT or None locations = res.locations or GD_ZKRUN_LOCATIONS or None if virtualenv_path is not None: LOGGER.info("Running venv run %s { %s } { %s }", enter_virtualenv_and_run, virtualenv_path, command) pid = enter_virtualenv_and_run(virtualenv_path, command).pid # type: ignore else: LOGGER.info("Running shell run %s { %s }", shell_run, command) pid = shell_run(command).pid # type: ignore LOGGER.info("Started process %s", pid) ZKSTART_STATUS[pid] = True def zkstart_signal_handler(_signum, _frame, runpid=None): LOGGER.info("Signal handler called with %s, %s", _signum, _frame) ZKSTART_STATUS[runpid] = False bind_signal_handlers([signal.SIGINT, signal.SIGTERM], partial(zkstart_signal_handler, runpid=pid)) registrar = ZkRegistrar(name=name, zkhosts=zkhosts, service_type=service_type, port=port, # type: ignore ssl_port=ssl_port, locations=locations) # type: ignore registrar.register() while ZKSTART_STATUS[pid] is True: if registrar.need_to_register is True: registrar.register() process_id, exit_status = wait_for_process(pid, 5, 3) if process_id is not None: LOGGER.info('Process %s exited with status %s', pid, exit_status) break LOGGER.info("Clean shutdown of %s", pid) registrar.unregister() return registrar # this is harmless, and handy for testing