AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 18.188.205.249
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /var/opt/nydus/ops/customer_local_ops/control_panel/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /var/opt/nydus/ops/customer_local_ops/control_panel/windows_plesk.py
# -*- coding: utf-8 -*-
from subprocess import list2cmdline
from typing import Any, Dict, Tuple, Union, List

import logging
import glob
import json
import os

from customer_local_ops import OpType, ResourceType, NydusResult
from customer_local_ops.operating_system.windows import Windows, Windows2016, Windows2019, Windows2022
from customer_local_ops.control_panel import SCRIPT_BASEPATH
from customer_local_ops.control_panel.plesk import OSPlesk
from customer_local_ops.util.execute import (runCommand, run_powershell_file, run_uapi_command,
                                             run_multiple_uapi_commands, start_powershell_file)
from customer_local_ops.util.retry import Retry

LOG = logging.getLogger(__name__)
PLESK_DIR_17 = 'C:\\Program Files (x86)\\Plesk\\'
PLESK_DIR_12 = 'C:\\Program Files (x86)\\Parallels\\Plesk\\'

# Number of seconds for Nydus to wait before retrying the previous workflow stop if a DLL conflict was detected
DLL_CONFLICT_COMMAND_RETRY_INTERVAL = 10
PLESK_FIX_DLL_CONFLICT_SCRIPT_LOG_FILE = r'C:\Windows\TEMP\plesk-fix-dll-conflict.log'

CmdsType = Union[str, List[str], List[Tuple[str, Union[str, List[str]]]]]


def has_dll_version_conflict(text: str) -> bool:
    """
    Check to see of the specified text contains an indication of the Python/PHP DLL conflict

    :param text: The text to be checked
    :return: True if the conflict is detected; otherwise, False
    """

    # 'vcruntime140.dll' 14.0 is not compatible with this PHP build linked with 14.16 in Unknown on line 0
    conflict_detected = "'vcruntime140.dll' 14.0 is not compatible" in text
    if conflict_detected:
        LOG.debug("DLL conflict detected")
    else:
        LOG.debug("No DLL conflict detected")
    return conflict_detected


class WindowsPlesk(Windows, OSPlesk):
    """
    Plesk Customer Local Ops for the Windows OS.
    All function names should contain 'plesk' so as not to override the OS ops
    """
    op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM
    plesk_dir = None
    RETRYABLE_ERRS = ['The Plesk administrator password cannot be changed until the server cloning is finished',
                      'No connection could be made because the target machine actively refused it',
                      'Could not resolve host']

    def get_plesk_dir(self) -> str:
        """Find the installation directory for plesk

        :raises RuntimeError: If a plesk path cannot be found
        :return: full path to plesk installation
        """
        if self.plesk_dir is None:
            # environment variable for plesk dir.
            if os.getenv('plesk_dir'):
                self.plesk_dir = os.getenv('plesk_dir')
            # plesk 17 default
            elif os.path.exists(PLESK_DIR_17):
                self.plesk_dir = PLESK_DIR_17
            # plesk 12 default
            elif os.path.exists(PLESK_DIR_12):
                self.plesk_dir = PLESK_DIR_12
            # go find a plesk!
            else:
                pleskpath = glob.glob('C:\\Program Files (x86)\\**\\Plesk\\', recursive=True)
                if pleskpath and isinstance(pleskpath, list):
                    self.plesk_dir = pleskpath[0]
        if self.plesk_dir is not None:
            LOG.info("get_plesk_dir: %s", self.plesk_dir)
            return self.plesk_dir
        raise RuntimeError("plesk path could not be found")

    def license_plesk(self, activation_key: str, *args: Any,
                      intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Run license utility on local vm with activation key

        :param activation_key: Key value passed back from license plesk op on hfs executor
        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        LOG.info("install plesk license on '%s'", str(self))
        op_name = 'license_plesk'
        command = 'license.exe'
        full_command = self.get_path_plesk(command)
        LOG.info("full command: %s", full_command)
        cmd = '"{full_command}" --install {activation_key}'.format(full_command=full_command,
                                                                   activation_key=activation_key)
        exit_code, outs, errs = run_uapi_command(cmd,
                                                 'set Plesk License', 'license_plesk', use_shell=True)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)

    # pylint: disable=too-many-locals
    def enable_plesk(self, vm_ip: str, vm_resource: str, plesk_user: str, plesk_pass: str,
                     *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Enable plesk on local server

        :param vm_ip: External IP address of the server
        :param vm_resource: The resource name for the third-party hosting provider
        :param plesk_user: User name to be used on Plesk instance
        :param plesk_pass: Password to be used on Plesk instance
        :param intermediate_result: Dict containing metadata for retries
        :raises DecryptError: if there is a problem with decrypting the password
        :return: tuple with success, error data
        """
        LOG.info("enable plesk for VM IP %s", vm_ip)
        op_name = self.OP_ENABLE_PLESK
        password_arg = list2cmdline([self.decrypt(plesk_pass)])
        ops_map = self.PLESK_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name]
        init_conf_cmd = self.get_path_plesk('init_conf.exe')

        check_configured_cmd = '"{}" --check-configured'.format(init_conf_cmd)
        exit_code, outs, errs = run_uapi_command(check_configured_cmd, "Check Plesk configured", op_name,
                                                 use_shell=True)
        res = self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
        if isinstance(res, Retry):
            return res
        if not res[0]:
            # Plesk is not configured
            init_conf_setup_flag = '--init'
        else:
            # Plesk is configured
            init_conf_setup_flag = '--update'

        enable_cmds = [
            ('set minimum password strength',
             '"{server_pref}" -u -min_password_strength medium'.format(
                 server_pref=self.get_path_plesk('server_pref.exe'))),
            ('setup Plesk', ops_map[self.SETUP_CMD].format(
                init_conf_cmd='"{}"'.format(init_conf_cmd),
                init_conf_setup_flag=init_conf_setup_flag,
                plesk_user=plesk_user, password=password_arg)),
            ('set Poweruser', '"{poweruser_cmd}" --on'.format(
                poweruser_cmd=self.get_path_plesk('poweruser.exe'))),
            ('reconfigurator', '"{reconfigurator}" /check=Services /no-gui'.format(
                reconfigurator=self.get_path_plesk('reconfigurator.exe', ['admin', 'bin']))),
            ('Repair sslcerts', '"{}" repair web -sslcerts'.format(self.get_path_plesk('plesk.exe')))
        ]

        exit_code, outs, errs = run_multiple_uapi_commands(enable_cmds, op_name, omit_string=password_arg)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)

    def site_list_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
        """ Retrieve a list of Plesk sites

        :param intermediate_result: Dict containing metadata for retries
        :return: list of sites or tuple with error data
        """
        LOG.info("get site list")
        op_name = 'site_list_plesk'

        plesk_dir_arg = [str(self.get_plesk_dir())]

        exit_code, outs, errs = run_powershell_file(SCRIPT_BASEPATH / 'powershell' / 'plesk_site_list.ps1',
                                                    op_name, False, plesk_dir_arg)
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
        logging.info(outs)
        return json.loads(outs) if outs else ''

    def server_prep_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """ Install Plesk on a server

        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        LOG.info("install plesk on '%s'", str(self))
        op_name = 'server_prep_plesk'

        exit_code, outs, errs = run_powershell_file(
            SCRIPT_BASEPATH / 'powershell' / 'plesk_server_prep.ps1', 'Prep Plesk')
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)

    def get_client_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
        """ Get Plesk SSO URL

        :param intermediate_result: Dict containing metadata for retries
        :return: SSO URL string or tuple with error data
        """
        op_name = 'get_client_plesk'
        exit_code, outs, errs = runCommand(["plesk", "bin", "admin", "--get-login-link"], 'get sso link')
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)

        links = outs.split('\n')
        sso_url = str(links[0])
        return self.encrypt(sso_url).decode('utf-8')

    def change_hostname_plesk(self, hostname: str, *args: Any,
                              intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """ Change hostname on Plesk

        :param hostname: New name to be assigned to the host
        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        op_name = 'change_hostname_plesk'
        rcmd = '"{plesk_path}" --update -hostname {hostname}'.format(plesk_path=self.get_path_plesk('server_pref.exe'),
                                                                     hostname=hostname)
        exit_code, outs, errs = run_uapi_command(rcmd, 'set Plesk hostname', op_name, use_shell=True)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)

    def change_admin_password_plesk(self, plesk_admin_pass: str, *args: Any,
                                    intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """ Change admin password on Plesk

        :param plesk_admin_pass: Encrypted password for Plesk admin user
        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        op_name = 'change_admin_password_plesk'
        password = self.decrypt(plesk_admin_pass)

        # Perform operations on Control Panel
        command_kwargs = {'omit_string': password}
        plesk_bin = self.get_path_plesk(None, ['bin', 'admin'])
        command = [plesk_bin, '--set-admin-password', '-passwd', password]

        exit_code, outs, errs = run_uapi_command(
            command, 'set Plesk admin password', op_name, **command_kwargs)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)

    def _check_for_dll_conflict(self, exit_code: int, result: Dict[str, Any],
                                intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """
        Check to see if the output of the previous command indicates that a DLL version conflict exists and, if so
        attempt remediation.

        :param exit_code: The exit code from the previous command
        :param result: The result dictionary from the previous command
        :param intermediate_result: Dict containing metadata for retries
        :return: A 2-tuple if no remediation is necessary, otherwise a 3-tuple where the last element is a
                 retry interval in seconds
        """
        if exit_code == 0:
            if intermediate_result is not None and intermediate_result.get('delete_log_file', False):
                try:
                    os.unlink(PLESK_FIX_DLL_CONFLICT_SCRIPT_LOG_FILE)
                except Exception:  # pylint: disable=broad-except
                    pass  # We don't care if we're trying to delete a file that doesn't exist
            return True, result

        if has_dll_version_conflict(result.get('errs', '')):
            LOG.error("DLL conflict detected; attempting to remediate")
            code, _, _ = start_powershell_file(SCRIPT_BASEPATH / 'powershell' / 'plesk_fix_dll_conflict.ps1')
            if code == 0:
                if intermediate_result is None:
                    intermediate_result = {}
                intermediate_result['delete_log_file'] = True
            return Retry(intermediate_result) if code == 0 else (False, result)
        return False, result

    def _result_handler(self, exit_code: int, outs: str, errs: str, op_name: str,
                        intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Take the result from a run command and check for retryable errors.
        If found, return a Retry. If not, return a Nydus result tuple.
        :param exit_code: The exit code from the executed command
        :param outs: The stdout output from the executed command
        :param errs: The stderr output from the executed command
        :param op_name: The name of the op
        :param intermediate_result: Dict containing metadata for retries
        :return: A Nydus result tuple, or Retry
        """
        success, result = self.build_result_from_cmd_output(exit_code, outs, errs, op_name, op_name + " succeeded")
        if not success:
            dll_res = self._check_for_dll_conflict(exit_code, result, intermediate_result=intermediate_result)
            if isinstance(dll_res, Retry):
                return dll_res
            return super()._result_handler(exit_code, outs, errs, op_name, intermediate_result)
        return success, result


class Windows2016Plesk(Windows2016, WindowsPlesk):
    pass


class Windows2019Plesk(Windows2019, WindowsPlesk):
    pass


class Windows2022Plesk(Windows2022, WindowsPlesk):
    pass

Anon7 - 2022
AnonSec Team