AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.135.247.94
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /var/opt/nydus/ops/customer_local_ops/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /var/opt/nydus/ops/customer_local_ops//__init__.py
from datetime import datetime
from typing import Any, Dict, List, Tuple, Union
from enum import Enum
import importlib
import logging
import shutil

from primordial.constants import CANONICAL_TIMESTRING_FORMAT
from primordial.encryptor import Encryptor

from customer_local_ops.exceptions import EncryptionKeyNotFound
from customer_local_ops.util.execute import runCommand
from customer_local_ops.util.helpers import create_file
from customer_local_ops.util.retry import Retry


LOG = logging.getLogger(__name__)


class OpType(Enum):
    CONTROL_PANEL = "cp_op"
    OPERATING_SYSTEM = "os_op"
    CONTROL_PANEL_OPERATING_SYSTEM = "cp_os_op"
    UNKNOWN = "unknown"


# TODO: Move to central repo
class ResourceType(Enum):
    OVH = "ovh"
    OPENSTACK = "openstack"
    VIRTUOZZO_VM = "virtuozzo_vm"


RESOURCE_TYPES = list(ResourceType)


NydusResult = Union[Retry, Tuple[bool, Dict[str, Any]], Tuple[bool, Dict[str, Any], int]]


class OpResult:
    """
    Class whose objects encapsulate the complete result of an operation.
    """
    def __init__(self, success: bool = False, result: Dict[str, Any] = None, retry_interval: int = None):
        """
        Initialize an OpResult instance

        :param success: Indicates whether or not the operation was successful
        :param result: A dictionary containing any messages or other data from the operation
        :param retry_interval:  An optional retry interval, in seconds. If this value is greater that or equal to
                                zero, the operation will be retried in that many seconds.
        """
        self.success = success
        self.result = result
        self.retry_interval = retry_interval

    @property
    def errs(self) -> str:
        """Convenience property for accessing the 'errs` key in the result dict"""
        return self.result.get('errs', '') if isinstance(self.result, dict) else ''

    @property
    def outs(self) -> str:
        """Convenience property for accessing the 'outs` key in the result dict"""
        return self.result.get('outs', '') if isinstance(self.result, dict) else ''

    def as_tuple(self) -> NydusResult:
        """
        Construct and return a correctly-sized tuple, based on whether or not a retry_interval value is present, for
        passing back to Nydus.
        """
        return ((self.success, self.result) if self.retry_interval is None
                else (self.success, self.result, self.retry_interval))


class Ops:
    DISK_UTILIZATION_PATH = None
    RETRYABLE_ERRS = []
    PANOPTA_MANIFEST_FILE = '/etc/panopta-agent-manifest'
    DISTRO = None  # override
    OS_VERSION = None  # override
    QEMU_PACKAGE_NAME = None  # override

    def __str__(self):
        return self.__class__.__name__

    op_type = OpType.UNKNOWN

    def get_op_type(self):
        return self.op_type

    def _get_vm_tag(self):
        """Return this VM's tag.  Same return format as runCommand."""
        raise NotImplementedError

    def _encryption_key(self):
        """The encryption key."""
        exit_code, outs, errs = self._get_vm_tag()
        if exit_code != 0 or not outs:
            LOG.error('Could not obtain encryption key')
            raise EncryptionKeyNotFound(exit_code, outs, errs)
        return outs

    def decrypt(self, encrypted):
        """Decrypt an encrypted value."""
        try:
            key = self._encryption_key()
            return Encryptor.decrypt(encrypted, key)
        except Exception:
            LOG.exception('Decrypt failed')
            raise

    def encrypt(self, target: str) -> str:
        """
        Encrypt a plain-text value.

        :param target: The value to be encrypted
        :return: An encrypted string
        """
        try:
            key = self._encryption_key()
            return Encryptor.encrypt(target, key)
        except Exception:
            LOG.exception('Encrypt failed')
            raise

    def write_file(self, path, content):
        LOG.debug('Writing file: %s', path)
        with open(path, 'w', encoding='utf-8') as f:
            f.write(content)

    def get_os_op(self, os_op: str, control_panel_class_name: str = None) -> "Ops":
        """
        Get an instance of an operating system Ops class or ControlPanel-OS Ops class

        :param os_op: The operating system class name e.g. linux.CentOS7
        :param control_panel_class_name: The control panel class name e.g. cpanel.CPanel
        :return: An instance of the operating system class e.g. CentOS7 or CP-OS class e.g. CentOS7CPanel
        """
        parts = os_op.split(".")
        # Last part of os_op should be the family name followed by the os class name, e.g. linux.CentOS7
        os_name = parts[-1]
        family_name = parts[-2]
        prefix = "customer_local_ops"
        if control_panel_class_name is None and self.get_op_type() == OpType.CONTROL_PANEL:
            # If we're requesting to get the os op class from a control panel class,
            # assume we want the operating system/control panel hybrid class
            control_panel_class_name = str(self)
        if control_panel_class_name is not None:
            # Return the operating system/control panel hybrid class
            os_name = os_name + control_panel_class_name
            cp_class_name = control_panel_class_name.lower()
            module_path = "{prefix}.control_panel.{family_name}_{cp_class_name}".format(prefix=prefix,
                                                                                        family_name=family_name,
                                                                                        cp_class_name=cp_class_name)
        else:
            module_path = "{prefix}.operating_system.{family_name}".format(prefix=prefix, family_name=family_name)
        mod = importlib.import_module(module_path)
        class_ = getattr(mod, os_name)
        os_op = class_()
        return os_op

    def build_result_dict(self, outs: str, errs: str, op_name: str) -> Dict[str, Any]:
        """
        This function takes command output (outs, errs) and builds and returns a result
        dict with op metadata and errors if appropriate

        :param outs: The stdout from the run command
        :param errs: The stderr from the run command
        :param op_name: the name of the op calling this function
        :return: a result dict
        """
        unicode_null = "\u0000"
        outs = outs.replace(unicode_null, "")
        errs = errs.replace(unicode_null, "")

        result = {'outs': outs, 'errs': errs, 'cls_name': str(self), 'op_type': self.get_op_type().value,
                  'op_name': op_name}
        return result

    def get_result_data(self, result: Any) -> OpResult:
        """
        This function takes a result tuple or result dict and parses it to a result tuple of a consistent format
        that can then be used by ops to inspect the result or build another result based on the first one

        :param result: a dict or tuple result from an op
        :return: A data structure containing the complete result
        """
        if isinstance(result, tuple):
            result_len = len(result)
            if result_len == 1:
                return OpResult(True, result[0])
            if result_len == 2:
                return OpResult(result[0], result[1])
            if result_len == 3:
                return OpResult(result[0], result[1], result[2])
            # Should never get here
            raise ValueError("result tuple has too many elements: {}".format(result_len))
        return OpResult(True, result)

    def build_result_from_other_result(self, result: Any, op_name: str) -> NydusResult:
        """
        This function takes a result tuple/dict/Retry/str from a secondary op (usually an os op) and (if not a Retry)
        extracts the metadata, output and errors from it and places it inside a result with metadata from the primary
        op (usually a control panel op).
        In this way it is clear:
        a). what the primary op was and
        b). that the error/output is coming from the secondary op

        :param result: a dict or tuple result from an op
        :param op_name: the name of the op calling this function
        :return: a result dict, tuple or Retry instance
        """
        if isinstance(result, Retry):
            return result

        data = self.get_result_data(result)
        if isinstance(data.result, dict):
            op_type = data.result.get('op_type', self.get_op_type().value)
            op_name_other = data.result.get('op_name', op_name)
            cls_name = data.result.get('cls_name', str(self))
            prefix = "{op_type} {cls_name}.{op_name_other}".format(op_type=op_type, cls_name=cls_name,
                                                                   op_name_other=op_name_other)
            outs = "{prefix} outs: {outs}".format(prefix=prefix, outs=data.outs)
            errs = data.errs
            if errs:
                errs = "{prefix} errs: {errs}".format(prefix=prefix, errs=data.errs)
        else:
            if data.success:
                outs = str(data.result)
                errs = ''
            else:
                outs = ''
                errs = str(data.result)
        data.result = self.build_result_dict(outs, errs, op_name)
        return data.as_tuple()

    def build_result_from_cmd_output(self, exit_code: int, outs: str, errs: str, op_name: str,
                                     success_message: str = None) -> NydusResult:
        """
        This function takes a result from a command output (exit_code, outs, errs) and builds and returns a result
        tuple with op metadata and errors if appropriate

        :param exit_code: The exit code returned from the run command
        :param outs: The stdout from the run command
        :param errs: The stderr from the run command
        :param op_name: the name of the op calling this function
        :param success_message: A message to use in place of 'outs' if the command ran successfully (optional)
        :return: a result tuple
        """
        success = exit_code == 0
        if success and success_message is not None:
            outs = success_message
        return success, self.build_result_dict(outs, errs, op_name)

    def run_command_and_handle_result(self,
                                      command: Union[List[str], str],
                                      op_name: str,
                                      *args: Any,
                                      intermediate_result: Dict[str, Any] = None,
                                      **kw: Any) -> NydusResult:
        """Run command, return result suitable for a Nydus operation.

        :param command: command list or string as per runCommand
        :param op_name: name of the operation; used as runCommand tag and result op_name
        :param args: additional positional arguments for runCommand
        :param intermediate_result:  Dict containing metadata for retries
        :param kw: additional keyword arguments for runCommand
        :return: command result as a Nydus operation result
        """
        exit_code, outs, errs = runCommand(command, op_name, *args, **kw)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)

    # def configure_ips(self, network_node):
    #     raise NotImplementedError

    # def patch_system(self, payload):
    #     raise NotImplementedError

    # def patch_system_specific_update(self, payload):
    #     raise NotImplementedError

    # def list_vm_patches(self, payload):
    #     raise NotImplementedError

    def add_user(self, payload, unused=None):
        raise NotImplementedError

    # def remove_user(self, payload):
    #     raise NotImplementedError

    # def change_password(self, payload):
    #     raise NotImplementedError

    # def change_hostname(self, payload):
    #     raise NotImplementedError

    # def install_package(self, payload):
    #     raise NotImplementedError

    # def uninstall_package(self, payload):
    #     raise NotImplementedError

    def shutdown_clean(self, unused=None):
        # The `unused` param is an artifact of Archon workflows requiring an I/O
        # chain for sequencing.
        raise NotImplementedError

    # def stop_web_server(self, none):
    #     raise NotImplementedError

    # def start_web_server(self, none):
    #     raise NotImplementedError

    # def prepare_web_server_ssl(self, none):
    #     raise NotImplementedError

    # def deploy_cert(self, payload):
    #     raise NotImplementedError

    # def install_cert(self, payload):
    #     raise NotImplementedError

    # def configure_mta(self, payload):
    #     raise NotImplementedError

    def enable_admin(self, username, unused=None):
        # The `unused` param is an artifact of Archon workflows requiring an I/O
        # chain for sequencing.
        raise NotImplementedError

    def disable_admin(self, username, unused=None):
        # The `unused` param is an artifact of Archon workflows requiring an I/O
        # chain for sequencing.
        raise NotImplementedError

    def disable_all_admins(self, unused=None):
        # The `unused` param is an artifact of Archon workflows requiring an I/O
        # chain for sequencing.
        raise NotImplementedError

    # def snapshot_clean(self, payload):
    #     raise NotImplementedError

    # def snapshot_prep(self, payload):
    #     raise NotImplementedError

    def _get_cpu_utilization(self) -> Dict[str, float]:
        """Return current CPU utilization.

        :returns: dictionary with keys:
            - cpuUsed: percentage in range 0-100
        """
        raise NotImplementedError

    def _get_disk_utilization(self) -> Dict[str, int]:
        """Return current disk utilization.

        :returns: dictionary with keys:
            - diskTotal: mebibytes
            - diskUsed: mebibytes
        """
        disk = shutil.disk_usage(self.DISK_UTILIZATION_PATH)
        return {'diskTotal': disk.total >> 20,  # B > MiB
                'diskUsed': disk.used >> 20}

    def _get_memory_utilization(self) -> Dict[str, int]:
        """Return current memory utilization.

        :returns: dictionary with keys:
            - memoryTotal: mebibytes
            - memoryUsed: mebibytes
        """
        raise NotImplementedError

    def get_utilization(self) -> Dict[str, Union[float, int, str]]:
        """Return current system utilization.

        :returns: dictionary with keys:
            - collected: current time in
                :data:`~primordial.constants.CANONICAL_TIMESTRING_FORMAT`
            - memoryTotal: mebibytes
            - memoryUsed: mebibytes
            - diskTotal: mebibytes
            - diskUsed: mebibytes
            - cpuUsed: percentage in range 0-100
        """
        result = {}
        for stats in (self._get_cpu_utilization(),
                      self._get_disk_utilization(),
                      self._get_memory_utilization()):
            result.update(stats)
        result['collected'] = datetime.utcnow().strftime(
            CANONICAL_TIMESTRING_FORMAT)
        return result

    def _create_panopta_manifest_file(self, payload):
        """ Panopta agent uses a manifest file for configuration
        of the customer id and templates to use.  This method creates that manifest file.

        """
        manifest_file = self.PANOPTA_MANIFEST_FILE
        customer_key = payload['customer_key']
        template_ids = payload['template_ids']
        cust_key_contents = """[agent]
customer_key = {customer_key}
templates = {template_ids}
enable_countermeasures = false""".format(customer_key=customer_key, template_ids=template_ids)

        server_key = payload.get('server_key')
        if server_key:
            cust_key_contents += "\nserver_key = {server_key}".format(server_key=server_key)
        fqdn = payload.get('fqdn')
        if fqdn:
            cust_key_contents += "\nfqdn = {fqdn}".format(fqdn=fqdn)
        server_name = payload.get('serverName')
        if server_name:
            cust_key_contents += "\nserver_name = {serverName}".format(serverName=server_name)
        disable_server_match = payload.get('disable_server_match', False)
        if disable_server_match:
            cust_key_contents += "\ndisable_server_match = true"
        create_file(manifest_file, cust_key_contents)

    def install_panopta(self, payload, *args, **kwargs):
        raise NotImplementedError

    def delete_panopta(self, *args, **kwargs):
        raise NotImplementedError

    def get_panopta_server_key(self, *args, **kwargs):
        raise NotImplementedError

    def _result_handler(self, exit_code: int, outs: str, errs: str, op_name: str,
                        intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Take the result from a run command and check for retryable errors.
        If found, return a Retry. If not, return a Nydus result tuple.
        :param exit_code: The exit code from the executed command
        :param outs: The stdout output from the executed command
        :param errs: The stderr output from the executed command
        :param op_name: The name of the op
        :param intermediate_result: Dict containing metadata for retries
        :return: A Nydus result tuple, or Retry
        """
        LOG.info(self.RETRYABLE_ERRS)
        success, result = self.build_result_from_cmd_output(exit_code, outs, errs, op_name, op_name + " succeeded")
        if not success:
            for retryable_err in self.RETRYABLE_ERRS:
                if retryable_err in errs:
                    if intermediate_result is None:
                        intermediate_result = {}
                    intermediate_result['retryable_error'] = retryable_err
                    return Retry(intermediate_result)
        return success, result

Anon7 - 2022
AnonSec Team