AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 18.223.195.70
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /proc/self/root/var/opt/nydus/ops/customer_local_ops/util/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /proc/self/root/var/opt/nydus/ops/customer_local_ops/util/execute.py
# -*- coding: utf-8 -*-

import logging
import os
import sys
import subprocess
import tempfile

from pathlib import Path

from typing import Any, Tuple, Union, List
from subprocess import list2cmdline

from primordial.sizes import ByteSize

from customer_local_ops.util import fmap

DEFAULT_MAX_SEGMENT_SIZE = ByteSize(KiBytes=32)
LOG = logging.getLogger(__name__)
MAX_COMMAND_RESPONSE_BUFFER_SIZE = 2 * DEFAULT_MAX_SEGMENT_SIZE

# Type aliases
RunCommandResult = Tuple[int, str, str]

# pylint: disable=import-error
if sys.platform == 'win32':
    import win32con
    from win32com.shell import shellcon
    from win32com.shell import shell

    HANDLE = int
    HWND = int
    HKEY = int
    DWORD = int

    def shell_start(file_name: str,
                    params: str = None,
                    verb: str = "open",
                    show: int = win32con.SW_SHOWNORMAL,
                    mask: int = shellcon.SEE_MASK_FLAG_DDEWAIT,
                    dir_name: str = None,
                    id_list: Any = None,
                    class_id: str = None,
                    class_key: HKEY = None,
                    hot_key: DWORD = None,
                    monitor: HANDLE = None,
                    window: HWND = None) -> bool:
        """
        Wrapper for the Win32 API call ShellExecuteEx. This function can be used on Windows platforms any time Nydus
        needs to restart itself for any reason because ShellExecuteEx allows the child process to survive after its
        parent process stops. Initially added to resolve a DLL conflict issue, this could also be useful for
        upgrading Nydus, for example.

        Further information for ShellExecuteEx can be found at
        https://docs.microsoft.com/en-us/windows/win32/api/shellapi/nf-shellapi-shellexecuteexa

        :param file_name:   The name of the executable to be started. ShellExecuteEx also handles file associations,
                            so this could also be the name of a document, for example, to be opened with the
                            associated application, or a URL to be opened by the user's default browser.
        :param params:      Parameters to be passed to the application. Should not be specified if file_name
                            is NOT an executable file.
        :param verb:        Action to take. May be one of 'edit', 'explore', 'find', 'open', 'print', 'properties' or
                            'runas'. Defaults to 'open'
        :param show:        Specify how an application is to be shown
        :param mask:        Flags that indicate content and validity of other parameters
        :param dir_name:    Name of the working directory. If None, the current directory is used
        :param id_list:     See Microsoft documentation at the URL above
        :param class_id:    See Microsoft documentation at the URL above
        :param class_key:   See Microsoft documentation at the URL above
        :param hot_key:     See Microsoft documentation at the URL above
        :param monitor:     See Microsoft documentation at the URL above
        :param window:      See Microsoft documentation at the URL above
        :return: True if the file/application is successfully launched; otherwise, False
        """
        def add_field(key: str, val: Any) -> None:
            """
            It is an error to include a field with a value of None in the structure passed to ShellExecuteEx, so this
            function only adds a key to the dictionary if the value is not None.

            :param key: The name of the SHELLEXECUTEINFO field
            :param val: The value of the field
            """
            if val is not None:
                shell_execute_info[key] = val

        shell_execute_info = {}
        add_field('fMask', mask)
        add_field('hwnd', window)
        add_field('lpVerb', verb)
        add_field('lpFile', file_name)
        add_field('lpParameters', params)
        add_field('lpDirectory', dir_name)
        add_field('nShow', show)
        add_field('lpIDList', id_list)
        add_field('lpClass', class_id)
        add_field('hkeyClass', class_key)
        add_field('dwHotKey', hot_key)
        add_field('hMonitor', monitor)

        return shell.ShellExecuteEx(**shell_execute_info)
else:
    def shell_start(**kwargs) -> bool:
        return True


def pfx(line):
    return '| ' + line


def pfxs(lines):
    return list(map(pfx, lines))


def trim_visible_output_to_lines(inp, out):
    """Converts input blob to lines... but not too many"""
    if inp is not None:
        out = out.replace(inp, '<<INPUT>>', 1)
    outl = out.strip().split('\n')
    if not outl:
        return outl
    if len(outl) == 1 and not outl[0].strip():
        return []  # no single blank lines please
    if len(outl) < 11:
        return pfxs(outl)
    return pfxs(outl[:5] + ['...elided %d lines...' % (len(outl) - 10)] + outl[-5:])


def mark_error_output(errOut):
    "Converts error blob into lines with prefix marks"
    se = errOut.strip()
    if se:
        for L in se.split('\n'):
            yield 'ERR: ' + L


class EndsBuffer:
    """String buffer that maintains two segments: a first segment and a
       last segment (i.e. the two ends), where each has a maximum size
       limit.  Any extra output in between these is discarded.  The
       assumption is that if there is a huge amount of output that
       some must be discarded to avoid memory overflows and that
       generally the first and last stuff is the most relevant.

    """
    def __init__(self, max_segment_size=None):
        self._mss = ByteSize(max(0, max_segment_size or 0)) or DEFAULT_MAX_SEGMENT_SIZE
        self._data = ''
        self._elided = False

    def add_str(self, data):
        self._data += data
        if len(self._data) > self._mss * 2:
            self._data = self._data[:self._mss] + self._data[-int(self._mss):]
            self._elided = True

    def get_bufstr(self):
        if self._elided:
            return '\n'.join([
                self._data[:self._mss],
                '...[elided]...',
                self._data[-int(self._mss):],
            ])
        return self._data


def str_form(bytestr):
    try:
        return bytestr.decode('utf-8', errors='backslashreplace')
    except UnicodeDecodeError:
        try:
            import chardet  # pylint: disable=import-outside-toplevel
            try:
                return bytestr.decode(chardet.detect(bytestr)['encoding'])
            except UnicodeDecodeError:
                pass
        except ImportError:
            pass
    except AttributeError:
        return bytestr  # already a string


class RunningCommand:
    def __init__(self, popen_obj, tag, stdInput, errorOK, logger,
                 max_buffer_size=MAX_COMMAND_RESPONSE_BUFFER_SIZE):
        self.p = popen_obj
        self.tag = tag
        self.stdInput = stdInput
        self.logger = (lambda r, _l=logger or LOG, e=errorOK:
                       getattr(_l, "error" if r and not e else "info"))
        self._bufsize = max_buffer_size
        self._output = EndsBuffer(self._bufsize / 2)
        self._errors = EndsBuffer(self._bufsize / 2)
        self._partial = False
        self._completed = False

    def checkIfCompleted(self):
        self._output.add_str(str_form(self.p.stdout.read()))
        self._errors.add_str(str_form(self.p.stderr.read()))
        return self.p.poll() is not None

    def outputToDate(self):
        o = self._output.get_bufstr()
        e = self._errors.get_bufstr()
        self._output = EndsBuffer(self._bufsize / 2)
        self._errors = EndsBuffer(self._bufsize / 2)
        self._partial = True
        return o, e

    def waitForCompletion(self):
        # n.b. communicate returns byte strings, in an unknown encoding, although
        # with older python, they could also be strings
        o, e = fmap(str_form, self.p.communicate())
        self._output.add_str(o)
        self._errors.add_str(e)
        self._completed = True
        return self

    def getResult(self, omitOutput=False):
        if not self._completed:
            self.waitForCompletion()
        outs, errs = self._output.get_bufstr(), self._errors.get_bufstr()
        output_lines = trim_visible_output_to_lines(self.stdInput, outs)
        if omitOutput:
            output_lines = [line if line.startswith("<<INPUT>>") else "<<REMOVED>>" for line in output_lines]

        for line in (output_lines
                     + list(mark_error_output(errs))
                     + ["ExitCode: %d" % self.p.returncode]):
            lstr = self.tag + ' ' + line
            self.logger(self.p.returncode)(lstr)

        # NOTE This is not a Nydus Result; it must be converted
        return self.p.returncode, outs.strip(), errs.strip()


def startCommand(cmd, tag, useShell=False, stdInput=None, errorOK=False,
                 logger=None, omitString=None, env=None):
    """Starts the specified command; no Shell.  The cmd is expected to be an
       array of strings, although it is also possible to specify
       simply a string if there are no arguments to the command.

       The tag value is only used to annotate log lines for
       distinctive tagging in the logging output.

       If errorOK is True then a non-zero return code is still only
       logged as "info" instead of "error".  The ProcOpResult will
       accurately reflect the result of the operation regardless of
       the errorOK argument.

       If omitString is specified, it is removed from any logging output.

       The command is run asynchronously in a separate process.  The
       return value is a RunningCommand object that supports a
       .checkIfCompleted() method to test for subprocess completion
       (non-blocking) and a .getResult() to get the ProcOpResult of a
       completed process (will block until completion if not already
       completed).
    """
    # if cmd is a string, leave as is to correctly obfuscate omitString
    # If cmd is a bytestring, make sure omitString and the replacement text are also bytestrings
    log_cmd = ' '.join(cmd) if isinstance(cmd, list) else cmd
    replace_text = "<<REMOVED>>"
    if isinstance(log_cmd, bytes):
        if omitString is not None and isinstance(omitString, str):
            omitString = omitString.encode('utf-8')
        replace_text = b'<<REMOVED>>'

    if omitString:
        log_cmd = log_cmd.replace(omitString, replace_text)

    if logger is None:
        logger = LOG
    logger.info(tag + " RUN: " + str(log_cmd))  # pylint: disable=logging-not-lazy

    # pylint: disable=consider-using-with
    p = subprocess.Popen(cmd,
                         shell=useShell,
                         stdout=subprocess.PIPE,
                         stdin=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         env=env)
    if stdInput:
        p.stdin.write(stdInput.encode())
    return RunningCommand(p, tag, stdInput, errorOK, logger)


def runCommand(cmd, tag, useShell=False, stdInput=None, errorOK=False,
               logger=None, omitString=None, omitOutput=False, env=None):
    """Runs the specified command and waits for completion; no Shell.  The
       cmd is expected to be an array of strings, although it is also
       possible to specify simply a string if there are no arguments
       to the command.

       The tag value is only used to annotate log lines for
       distinctive tagging in the logging output.

       If errorOK is True then a non-zero return code is still only
       logged as "info" instead of "error".  The ProcOpResult will
       accurately reflect the result of the operation regardless of
       the errorOK argument.

       If omitString is specified, it is removed from any logging output.
    """
    return startCommand(cmd, tag,
                        useShell=useShell,
                        stdInput=stdInput,
                        errorOK=errorOK,
                        logger=logger,
                        omitString=omitString,
                        env=env) \
        .waitForCompletion() \
        .getResult(omitOutput=omitOutput)


def run_command_pipe(cmd, useShell=False):
    """Used to run commands that make use of | """
    p = subprocess.Popen(cmd, shell=useShell, stdout=subprocess.PIPE)
    with p:
        o, e = p.communicate()
        return p.returncode, o, e


def run_powershell(command: str, tag: str, quiet: bool = False) -> Tuple[int, str, str]:
    """Execute the code in ``command`` using PowerShell.

    :param command: Code to execute
    :param tag: Short string that will be added to logs to improve searchability.
    :param quiet: Omit the output of the command from the result.
    :return: Tuple containing (exit_code, stdout, stderr)
    """
    # 1. Python 3.5 Windows: cannot use context manager - it does not unlink the file on exit.
    # 2. Encode in UTF-8 with byte order mark (BOM) so PowerShell 5.1 (Windows 2016) sees the file is UTF-8. Without
    #    this, the file will be encoded with the default system encoding which can be changed by administrators and
    #    which, by default, cannot encode all Unicode characters properly. Encoding as UTF-8 without BOM does not work
    #    because standard output cannot be decoded as UTF-8.
    #    - Encoding PowerShell scripts:
    #      https://docs.microsoft.com/en-us/powershell/scripting/dev-cross-plat/
    #      vscode/understanding-file-encoding?view=powershell-7.1
    #    - utf-8-sig encoding: https://docs.python.org/3.5/library/codecs.html#encodings-and-unicode
    #    - Default encoding (locale.getpreferredencoding()): https://docs.python.org/3.5/library/functions.html#open
    # pylint: disable=consider-using-with
    temp = tempfile.NamedTemporaryFile(mode='w+t', encoding='utf-8-sig', suffix='.ps1', delete=False)
    try:
        temp.write(command)
        temp.close()
        if not quiet:
            LOG.debug("Running PowerShell command: %s", command)
        return run_powershell_file(temp.name, tag, quiet)
    finally:
        os.unlink(temp.name)


def run_shell_script_file(script_file: Union[str, Path], tag: str,
                          quiet: bool = False, script_file_args: List[str] = None,
                          stdin: str = None) -> Tuple[int, str, str]:
    """Invoke Shell script, passing just ``filename`` or ``filepath`` and a list of arguments for execution of the file

    :param script_file: The path of the script file to execute
    :param tag: Short string that will be added to logs to improve searchability.
    :param quiet: Omit the output of the command from the result.
    :param script_file_args: List of string arguments if required while execution of the file
    :param stdin: string to pass through standard input
    :return: Tuple containing (exit_code, stdout, stderr)
    """
    command = ['bash', str(script_file)]
    if script_file_args:
        for arg in script_file_args:
            command.append(str(arg))
    return runCommand(command, tag, stdInput=stdin, omitOutput=quiet)


def run_powershell_file(script_file: Union[str, Path], tag: str,
                        quiet: bool = False, script_file_args: List[str] = None,
                        stdin: str = None) -> Tuple[int, str, str]:
    """Invoke PowerShell, passing just ``filename`` or ``filename`` and a list of arguments for execution of the file

    :param script_file: The path of the script file to execute
    :param tag: Short string that will be added to logs to improve searchability.
    :param quiet: Omit the output of the command from the result.
    :param script_file_args: List of string arguments if required while execution of the file
    :param stdin: string to pass through standard input
    :return: Tuple containing (exit_code, stdout, stderr)
    """
    command = ['powershell.exe', '-ExecutionPolicy', 'Unrestricted', '-File', str(script_file)]
    if script_file_args:
        for arg in script_file_args:
            command.append(str(arg))
    return runCommand(command, tag, stdInput=stdin, omitOutput=quiet)


def run_uapi_command(cmd_list: Union[str, List[str]], description: str, op_name: str,
                     use_shell: bool = False, omit_string: str = None) -> Tuple[bool, str, str]:
    """Run a command on the target machine

    :param cmd_list: string or list of strings that represents a command to be executed
    :param description: short description of what the command is doing
    :param op_name: name of the operation that is running the command
    :param use_shell: indicator that the command should be executed in a shell
    :param omit_string: a string to be removed from any logging output.
    :return: Tuple containing (exit_code, stdout, stderr)
    """
    cmd_list_str = str(cmd_list)
    if omit_string:
        cmd_list_str = cmd_list_str.replace(omit_string, "<<REMOVED>>")
    LOG.debug("run_uapi_command %s cmd_list: %s", op_name, cmd_list_str)
    exit_code, outs, errs = runCommand(cmd_list, description, useShell=use_shell, omitString=omit_string)
    LOG.debug("%s_result- %s - %s - %s", description, exit_code, outs, errs)
    return exit_code, outs, errs


def run_multiple_uapi_commands(cmds_list: List[Tuple[str, Union[str, List[str]]]], op_name: str,
                               use_shell: bool = False, omit_string: str = None) -> Tuple[bool, str, str]:
    """Run a command on the target machine

        :param cmds_list: List of tuples with description-command pairs
        :param op_name: name of the operation that is running the command
        :param use_shell: indicator that the command should be executed in a shell
        :param omit_string: a string to be removed from any logging output.
        :return: Tuple containing (exit_code, stdout, stderr)
        """
    exit_code, outs, errs = 0, '', ''
    for description, cmd in cmds_list:
        exit_code, outs, errs = run_uapi_command(cmd, description, op_name, use_shell=use_shell,
                                                 omit_string=omit_string)
        if exit_code != 0:
            return exit_code, outs, errs
    return exit_code, outs, errs


def start_powershell_file(script_file: Union[str, Path]) -> Tuple[int, str, str]:
    """
    Invoke PowerShell, passing just ``filename`` or ``filename`` and a list of arguments for execution of the file.
    Do not wait for the sub-process to complete. The exit code returned in the tuple only indicates whether or
    not the process was successfully started.

    :param script_file: The path of the script file to execute
    :param tag: Short string that will be added to logs to improve searchability.
    :param quiet: Omit the output of the command from the result.
    :param script_file_args: List of string arguments if required while execution of the file
    :return: Tuple containing (exit_code, stdout, stderr)
    """

    params = list2cmdline(['-ExecutionPolicy', 'Unrestricted', '-File', str(script_file)])
    LOG.info("Staring PowerShell: powershell.exe %s", params)
    code = 0 if shell_start('powershell.exe', params=params) else 1
    return code, '', ''


def start_powershell(command: str) -> Tuple[int, str, str]:
    """Execute the code in ``command`` using PowerShell.

    :param command: Code to execute
    :param tag: Short string that will be added to logs to improve searchability.
    :param quiet: Omit the output of the command from the result.
    :return: Tuple containing (exit_code, stdout, stderr)
    """

    # Because the process started by this command could (and probably will) survive after this process has exited,
    # we can't delete the temporary file, here.
    # pylint: disable=consider-using-with
    temp = tempfile.NamedTemporaryFile(mode='w+t', suffix='.ps1', delete=False)
    temp.write(command)
    temp.close()

    LOG.debug("Running Powershell command: %s", command)
    return start_powershell_file(temp.name)

Anon7 - 2022
AnonSec Team