AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.142.43.151
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /var/opt/nydus/ops/customer_local_ops/control_panel/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /var/opt/nydus/ops/customer_local_ops/control_panel/linux_plesk.py
# -*- coding: utf-8 -*-

from typing import Dict, Any

import json
import logging
import os
import re
import shlex
import sys

from enum import IntEnum
from fileinput import FileInput

from customer_local_ops import OpType, ResourceType, NydusResult
from customer_local_ops.operating_system.linux import (AlmaLinux8, AlmaLinux9, Linux, CentOS, CentOS6, CentOS7, Debian,
                                                       Debian8, Debian10, Debian11, Debian12, Ubuntu1604,
                                                       Ubuntu2004, Ubuntu2204)
from customer_local_ops.control_panel.plesk import OSPlesk
from customer_local_ops.util.execute import runCommand, run_uapi_command, run_multiple_uapi_commands
from customer_local_ops.util.retry import Retry


LOG = logging.getLogger(__name__)
PLESK_DIR_1 = "/opt/psa"
PLESK_DIR_2 = "/usr/local/psa"


class LinuxPlesk(Linux, OSPlesk):
    """
    Plesk Customer Local Ops for the Linux OS.
    All function names should contain 'plesk' so as not to override the OS ops
    """
    op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM

    PLESK_INSTALL_LOCATIONS = [
        PLESK_DIR_1,
        PLESK_DIR_2,
    ]
    plesk_dir = None
    RETRYABLE_ERRS = ['The Plesk administrator password cannot be changed until the server cloning is finished',
                      'No connection could be made because the target machine actively refused it',
                      'Could not resolve host']

    def get_plesk_dir(self) -> str:
        """Find the installation directory for plesk

        :return: full path to plesk installation
        """
        # Look for plesk in any of several locations. On Ubuntu, it's currently installed in
        # /opt/psa and on CentOS, it can be found in /usr/local/psa. The list below could
        # easily be made into a configurable item so that new potential locations could be
        # added without having to change the code.
        if self.plesk_dir is not None:
            return self.plesk_dir
        for loc in self.PLESK_INSTALL_LOCATIONS:
            if os.path.isdir(loc):
                LOG.info(loc)
                self.plesk_dir = loc
                return self.plesk_dir

        raise RuntimeError("plesk path could not be found")

    def license_plesk(self, activation_key: str, *args: Any,
                      intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Run license utility on local vm with activation key

        :param activation_key: Key value passed back from license plesk op on hfs executor
        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        LOG.info("install plesk license on '%s'", str(self))
        op_name = 'license_plesk'
        command = 'license'
        full_command = self.get_path_plesk(command)
        LOG.info("full command: %s", full_command)
        exit_code, outs, errs = run_uapi_command([full_command, '--install', activation_key],
                                                 'set Plesk License', 'license_plesk')
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)

    # pylint: disable=too-many-locals
    def enable_plesk(self, vm_ip: str, vm_resource: str, plesk_user: str, plesk_pass: str,
                     *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Enable plesk on local vm

        :param vm_ip: External IP address of the VM
        :param vm_resource: The resource name for the third-party hosting provider
        :param plesk_user: User name to be used on Plesk instance
        :param plesk_pass: Password to be used on Plesk instance
        :param intermediate_result: Dict containing metadata for retries
        :raises DecryptError: if there is a problem with decrypting the password
        :return: tuple with success, error data
        """
        LOG.info("enable plesk")
        op_name = self.OP_ENABLE_PLESK
        password_arg = shlex.quote(self.decrypt(plesk_pass))
        ops_map = self.PLESK_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name]
        init_conf_cmd = self.get_path_plesk('init_conf')

        check_configured_cmd = '{} --check-configured'.format(init_conf_cmd)
        exit_code, outs, errs = run_uapi_command(check_configured_cmd, "Check Plesk configured", op_name,
                                                 use_shell=True)
        if exit_code != 0:
            # Plesk is not configured
            init_conf_setup_flag = '--init'
        else:
            # Plesk is configured
            init_conf_setup_flag = '--update'

        prep_cmds = [
            ('set minimum password strength',
             '{server_pref} -u -min_password_strength medium'.format(
                 server_pref=self.get_path_plesk('server_pref'))),
            ('setup Plesk', ops_map[self.SETUP_CMD].format(
                init_conf_cmd=init_conf_cmd,
                # init_conf_setup_flag=ops_map[self.INIT_CONF_SETUP_FLAG],
                init_conf_setup_flag=init_conf_setup_flag,
                plesk_user=plesk_user, password=password_arg)),
            ('check config', check_configured_cmd),
            ('set Poweruser', '{set_poweruser_cmd} --on'.format(set_poweruser_cmd=self.get_path_plesk('poweruser'))),
            ('set auto updates', 'plesk db "INSERT INTO misc(param, val) ' +
             'VALUES(\'automaticSystemPackageUpdates\', \'true\') ON DUPLICATE KEY UPDATE val = \'true\';"')
        ]
        if ops_map[self.RUN_HIDE_INTERNAL_IP]:
            prep_cmds += [
                ('hide internal ip', '''sed -i'' 's/blacklist=".*"/blacklist="{vm_ip}"/' '''
                 '/usr/local/psa/admin/conf/panel.ini'.format(vm_ip=vm_ip)),
                ('reread ips', 'plesk bin ipmanage --reread')
            ]
        if ops_map[self.RUN_DISABLE_SESSION_IP_CHECK]:
            prep_cmds += [
                ('disable session ip check',
                 'plesk db "'
                 'INSERT INTO misc(param,val) '
                 'VALUES(\'disable_check_session_ip\', \'true\') '
                 'ON DUPLICATE KEY UPDATE val = \'true\';"')
            ]

        exit_code, outs, errs = run_multiple_uapi_commands(prep_cmds, op_name, use_shell=True, omit_string=password_arg)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)

    def site_list_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
        """Retrieve a list of Plesk sites

        :param intermediate_result: Dict containing metadata for retries
        :return: list of sites or tuple with error data
        """
        LOG.info("get site list")

        site_list = {}
        exit_code, outs, errs = runCommand(self.get_path_plesk('subscription') + " --list",
                                           "site_list_plesk: get_subscriptions", useShell=True)
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_subscriptions',
                                        intermediate_result)
        site_list['subscriptions'] = outs.split('\n')
        site_cmd = self.get_path_plesk('site')

        exit_code, outs, errs = runCommand(site_cmd + " --list", "site_list_plesk: get_sites", useShell=True)
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_sites', intermediate_result)
        site_list['sites'] = outs.split('\n')

        required_fields = {'FTP Login': 'ftp_login',
                           'IP address': 'ip_address',
                           'Disk space used by httpdocs': 'diskused',
                           'Hosting type': 'webspace'}

        for i, site in enumerate(site_list['sites']):
            site_data = {'name': site}
            exit_code, outs, errs = runCommand(site_cmd + " --info " + site, "site_list_plesk: get_site_info",
                                               useShell=True)
            if exit_code != 0:
                return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_site_info',
                                            intermediate_result)
            output = outs.split('\n')
            for line in output:
                data = [x.strip() for x in line.split(':')]
                logging.info("data: %s", data)
                if len(data) == 2:
                    key = data[0]
                    if key in required_fields:
                        site_data[required_fields[key]] = data[1]

            site_list['sites'][i] = site_data

        logging.info(json.dumps(site_list))
        return site_list

    def server_prep_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """ Install Plesk on a server

        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        LOG.info("install plesk on '%s'", str(self))
        op_name = 'server_prep_plesk'

        # download plesk-installer. keep: provides compatability for updates, troubleshooting, etc.
        prepCmds = [
            ('Download Plesk', 'wget https://autoinstall.plesk.com/plesk-installer'),
            ('Modify Perms', 'chmod +x plesk-installer'),
            ('Install Plesk', 'sh plesk-installer --select-product-id=plesk --installation-type Typical ' +
             '--select-release-latest --notify-email admin@example.com'),
            ('Delete Docker Interface', 'ip link del docker0')
        ]

        for purpose, cmd in prepCmds:
            exit_code, outs, errs = run_uapi_command(cmd, purpose, op_name, use_shell=True)
            if exit_code != 0:
                return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)

    def get_client_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
        """ Get Plesk SSO URL

        :param intermediate_result: Dict containing metadata for retries
        :return: SSO URL string or tuple with error data
        """
        exit_code, outs, errs = runCommand(self.get_path_plesk('admin') + " --get-login-link",
                                           "get sso link", useShell=True)
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, 'get_client_plesk',
                                        intermediate_result=intermediate_result)
        links = outs.split('\n')
        sso_url = str(links[0])
        return self.encrypt(sso_url).decode('utf-8')

    def change_hostname_plesk(self, hostname: str, *args: Any,
                              intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """ Change hostname on Plesk

        :param hostname: The new server hostname
        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        op_name = 'change_hostname_plesk'

        rcmd = self.get_path_plesk('server_pref') + ' --update' + ' -hostname ' + hostname
        exit_code, outs, errs = run_uapi_command(rcmd, 'set Plesk hostname', op_name, use_shell=True)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)

    def change_admin_password_plesk(self, plesk_admin_pass: str, *args: Any,
                                    intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """ Change admin password on Plesk

        :param plesk_admin_pass: Encrypted password for Plesk admin user
        :param intermediate_result: Dict containing metadata for retries
        :return: tuple with success, error data
        """
        op_name = 'change_admin_password_plesk'
        password = self.decrypt(plesk_admin_pass)

        # Perform operations on Control Panel
        plesk_bin = self.get_path_plesk('init_conf')
        command = [plesk_bin, '--set-admin-password', '-passwd', password]
        exit_code, outs, errs = run_uapi_command(
            command, 'set Plesk admin password', op_name, omit_string=password)
        return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)

    def configure_postfix(self, relay_address: str, op_name: str,
                          intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Configure the postfix MTA.

        :param relay_address: IP address or host name of the mail relay to set
        :param op_name: name of the operation being executed
        :param intermediate_result: Dict containing metadata for retries
        """
        if relay_address is None:
            return

        class SearchState(IntEnum):
            SEARCHING = 0
            SKIPPING = 1
            DONE = 2

        # In the configuration file distributed with postfix, there are a either a number of commented-out relayhost
        # lines or a blank relay host line. Add our new line immediately after the last commented-out relayhost line
        # or replace the blank relayhost line
        try:
            relayhost_cmt = re.compile(r'^[ \t]*#[ \t]*relayhost[ \t]*=')
            relayhost_blnk = re.compile(r'^[ \t]*relayhost[ \t]*=')
            relayhost_line = "relayhost = [{}]\n".format(relay_address)
            state = SearchState.SEARCHING
            with FileInput('/etc/postfix/main.cf', inplace=True) as stream:
                for line in stream:
                    if relayhost_blnk.match(line):
                        sys.stdout.write(relayhost_line)
                        state = SearchState.DONE
                        continue
                    if relayhost_cmt.match(line):
                        if state == SearchState.SEARCHING:
                            state = SearchState.SKIPPING
                    else:
                        if state == SearchState.SKIPPING:
                            # We've found the first line after the other commented-out relayhost lines
                            # Add the new relayhost line here and set the state to indicate that we've
                            # finished.
                            sys.stdout.write(relayhost_line)
                            state = SearchState.DONE
                    sys.stdout.write(line)
            # If we get here and the state isn't DONE, then just add the line to the end of the file
            if state != SearchState.DONE:
                with open('/etc/postfix/main.cf', 'a', encoding='utf-8') as f:
                    f.write(relayhost_line)

        except Exception as ex:  # pylint: disable=broad-except
            LOG.error("cp_os_op %s.configure_postfix result(fail): %s", op_name, ex)
            return self._result_handler(1, '', str(ex), op_name, intermediate_result=intermediate_result)

        LOG.info("restarting postfix")
        my_op_name = op_name + ': restart postfix'
        return self.run_command_and_handle_result(['systemctl', 'restart', 'postfix'], my_op_name,
                                                  intermediate_result=intermediate_result)

    def set_outgoing_email_ip(self, address: str, *args: Any,
                              intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Set Plesk's outgoing e-mail IP address.

        This only works with Postfix mail server.  See General #6 for more information:
        https://docs.plesk.com/en-US/obsidian/administrator-guide/mail/configuring-serverwide-mail-settings.59430/

        :param address: IP address from which to send e-mail
        :param intermediate_result: Dict containing result data and meta data for retries
        :return: Nydus operation result
        """
        return self.run_command_and_handle_result(
            [self.get_path_plesk(filename='mailserver'),
             '--set-outgoing-email-mode', 'explicit-ip', '-explicit-ipv4', address],
            'set_outgoing_email_ip', intermediate_result=intermediate_result)


class CentOSPlesk(CentOS, LinuxPlesk):
    PLESK_INSTALL_LOCATIONS = [
        PLESK_DIR_2
    ]

    def configure_mta(self, payload, unused=None, intermediate_result=None) -> NydusResult:
        """Configure mail transfer agent on a CentOS-based Plesk server
        :param payload: Dict containing op params
        :param intermediate_result: Dict containing result data and meta data for retries
        :return: tuple with success, retry or error data
        """
        LOG.info("%s.configure_mta", self)
        return self.do_configure_mta(payload, 'CentOSPlesk.configure_mta', intermediate_result=intermediate_result)

    def do_configure_mta(self, payload: Dict[str, Any], op_name: str,
                         intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Configure mail transfer agent on a CentOS-based Plesk server
        :param payload: Dict containing op params
        :param op_name: The name of the op, including the classname
        :param intermediate_result: Dict containing result data and meta data for retries
        :return: tuple with success, retry or error data
        """
        LOG.info("%s.do_configure_mta", self)
        result = self.install_postfix(op_name, intermediate_result=intermediate_result)
        if isinstance(result, Retry) or not result[0]:
            return result
        relay_address = payload.get('relay_address')
        LOG.info("%s %s start relay_address: %s", self.get_op_type().value, op_name, relay_address)
        return self.configure_postfix(relay_address, op_name, intermediate_result=intermediate_result)

    def install_postfix(self, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Install postfix on a CentOS-based Plesk server
       :param op_name: The name of the op, including the classname
       :param intermediate_result: Dict containing result data and meta data for retries
       :return: tuple with success, retry or error data
       """
        LOG.info("%s.install_postfix", self)

        my_op_name = op_name + ': yum_clean_all'
        exit_code, outs, errs = self._run_yum_command(['yum', 'clean', 'all'], my_op_name)
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)

        my_op_name = op_name + ': remove_sendmail'
        exit_code, outs, errs = self._run_yum_command(['yum', '-y', 'remove', 'sendmail'], my_op_name)
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)

        my_op_name = op_name + ': install postfix'
        exit_code, outs, errs = self._run_yum_command(['yum', '-y', 'install', 'postfix'], my_op_name)
        return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)


class CentOS6Plesk(CentOS6, CentOSPlesk):  # pylint: disable=too-many-ancestors
    def __init__(self, *args, **kwargs):  # pylint: disable=super-init-not-called
        raise NotImplementedError


class CentOS7Plesk(CentOS7, CentOSPlesk):  # pylint: disable=too-many-ancestors
    pass


class AlmaLinux8Plesk(AlmaLinux8, CentOSPlesk):  # pylint: disable=too-many-ancestors
    pass


class AlmaLinux9Plesk(AlmaLinux9, CentOSPlesk):  # pylint: disable=too-many-ancestors
    pass


class DebianPlesk(Debian, LinuxPlesk):

    def configure_mta(self, payload, unused=None, intermediate_result=None) -> NydusResult:
        """Configure mail transfer agent on a Debian-based Plesk server

        :param payload: Dict containing op params
        :param intermediate_result: Dict containing result data and meta data for retries
        :return: tuple with success, retry or error data
        """
        LOG.info("DebianPlesk.configure_mta")
        return self.do_configure_mta(payload, 'DebianPlesk.configure_mta', intermediate_result=intermediate_result)

    def do_configure_mta(self, payload: Dict[str, Any], op_name: str,
                         intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Configure mail transfer agent on a Debian-based Plesk server

       :param payload: Dict containing op params
       :param op_name: The name of the op, including the classname
       :param intermediate_result: Dict containing result data and meta data for retries
       :return: tuple with success, retry or error data
       """
        LOG.info("DebianPlesk.do_configure_mta")
        result = self.install_postfix(op_name, intermediate_result=intermediate_result)
        if isinstance(result, Retry) or not result[0]:
            return result
        relay_address = payload.get('relay_address')
        LOG.info("%s %s start relay_address: %s", self.get_op_type().value, op_name, relay_address)
        return self.configure_postfix(relay_address, op_name, intermediate_result=intermediate_result)

    def install_postfix(self, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult:
        """Install postfix on a CentOS-based Plesk server
       :param op_name: The name of the op, including the classname
       :param intermediate_result: Dict containing result data and meta data for retries
       :return: tuple with success, retry or error data
       """
        LOG.info("DebianPlesk.install_postfix")
        my_op_name = op_name + ': remove_sendmail'
        exit_code, outs, errs = runCommand(['apt-get', 'remove', '-y', 'sendmail'], my_op_name)
        if exit_code != 0:
            return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)

        result = self._install('postfix')
        if isinstance(result, Retry):
            return result
        my_op_name = op_name + ': install postfix'
        exit_code, outs, errs = result
        return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)


class Debian8Plesk(Debian8, DebianPlesk):  # pylint: disable=too-many-ancestors
    def __init__(self, *args, **kwargs):  # pylint: disable=super-init-not-called
        raise NotImplementedError


class Debian10Plesk(Debian10, DebianPlesk):  # pylint: disable=too-many-ancestors
    def __init__(self, *args, **kwargs):  # pylint: disable=super-init-not-called
        raise NotImplementedError


class Debian11Plesk(Debian11, DebianPlesk):  # pylint: disable=too-many-ancestors
    def __init__(self, *args, **kwargs):  # pylint: disable=super-init-not-called
        raise NotImplementedError


class Debian12Plesk(Debian12, DebianPlesk):  # pylint: disable=too-many-ancestors
    def __init__(self, *args, **kwargs):  # pylint: disable=super-init-not-called
        raise NotImplementedError


class Ubuntu1604Plesk(Ubuntu1604, DebianPlesk):  # pylint: disable=too-many-ancestors
    PLESK_INSTALL_LOCATIONS = [
        PLESK_DIR_1
    ]


class Ubuntu2004Plesk(Ubuntu2004, DebianPlesk):  # pylint: disable=too-many-ancestors
    PLESK_INSTALL_LOCATIONS = [
        PLESK_DIR_1
    ]


class Ubuntu2204Plesk(Ubuntu2204, DebianPlesk):  # pylint: disable=too-many-ancestors
    PLESK_INSTALL_LOCATIONS = [
        PLESK_DIR_1
    ]

Anon7 - 2022
AnonSec Team