AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 18.222.161.123
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /var/opt/nydus/ops/customer_local_ops/control_panel/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /var/opt/nydus/ops/customer_local_ops/control_panel/linux_cpanel.py
from datetime import datetime
from typing import Dict, Any, List, Tuple
import functools
import logging
import os
import socket
import json

from shortuuid import ShortUUID

from customer_local_ops import OpType, ResourceType
from customer_local_ops.operating_system.linux import (
    AlmaLinux8, AlmaLinux9, Linux, CentOS, CentOS6, CentOS7, Debian, Debian8, Ubuntu1604)
from customer_local_ops.control_panel.cpanel import CPanelException, OSCPanel
from customer_local_ops.util import random_password
from customer_local_ops.util.execute import runCommand, run_uapi_command
from customer_local_ops.util.helpers import replace_line, edit_file_lines, create_file, replace_file_lines_multiple
from customer_local_ops.util.retry import Retry, RETRY

LOG = logging.getLogger(__name__)

SHORT_UUID_DEFAULT_LENGTH = 30


class LinuxCPanel(Linux, OSCPanel):
    """
    CPanel Customer Local Ops for the Linux OS.
    All function names should contain 'cpanel' so as not to override the OS ops
    """

    AUTO_RESTART_EXCLUDE_SERVICES = ['nydus-ex', 'nydus-ex-api']
    AUTO_RESTART_EXCLUSION_FILE = '/etc/cpanel/local/ignore_outdated_services'
    # ^ as of cPanel 11.76 / Jan 13 2019
    HOSTNAME_CHANGE_LOCK_FILE = '/var/cpanel/.application-locks/UpdateHostname'

    op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM

    # this is a bit different. If the user picks cpanel, we don't want to do the regular os_op configureMTA
    # (exim conflicts with sendmail)
    def configure_mta_cpanel(self, payload: Dict[str, Any], *args: Any) -> Any:
        """Configures the mail transfer agent for cPanel

        :param payload: A dict containing input data
        """
        LOG.info("LinuxCPanel.configure_mta_cpanel start")
        op_name = 'configure_mta_cpanel'
        try:
            set_tgt = functools.partial(replace_line,
                                        match='defaultmailaction',
                                        replace='defaultmailaction=fail\n',
                                        firstword=False)
            edit_file_lines('/var/cpanel/cpanel.config', set_tgt)
            relay = payload.get('relay_address')
            if relay is not None:
                exim_conf_path = '/etc/exim.conf.local'
                LOG.info("LinuxCPanel.configure_mta_cpanel writing %s", exim_conf_path)
                create_file(exim_conf_path, """
@AUTH@
@BEGINACL@
@CONFIG@
@DIRECTOREND@
@DIRECTORMIDDLE@
@DIRECTORSTART@
@ENDACL@
@RETRYEND@
@RETRYSTART@
@REWRITE@
@ROUTEREND@
@ROUTERSTART@
send_to_smart_host:
driver = manualroute
route_list = !+local_domains %s
transport = remote_smtp
@TRANSPORTEND@
@TRANSPORTMIDDLE@
@TRANSPORTSTART@
""" % relay)
        except Exception as ex:  # pylint: disable=broad-except
            LOG.error('cp_os_op LinuxCPanel configure_mta_cpanel result(fail): %s', str(ex))
            return False, self.build_result_dict('', str(ex), op_name)

        LOG.info("LinuxCPanel.configure_mta_cpanel running buildeximconf")
        return self._run_uapi_command(['/scripts/buildeximconf'], 'build exim config', op_name)

    def _run_uapi_command(self, cmd_list: List[str], description: str, op_name: str) -> Tuple[bool, Dict[str, Any]]:
        """Runs a local cPanel command

        :param cmd_list: A list of commands to run
        :param description: A description of the command(s) to be run
        :param op_name: The name of the op calling this function
        """
        exit_code, outs, errs = run_uapi_command(cmd_list, description, op_name)
        if exit_code == 0:
            # cpanel's uapi returns 0 even when it errors. For example, if you try to add the same site name twice.
            LOG.debug("cp_op_result: success! %s", errs)
            if 'uapi --user=' in ' '.join(cmd_list):
                # The errors key is always returned. A '~' value indicates no errors occurred.
                if 'errors: ~' not in outs:
                    return False, self.build_result_dict(outs, errs, op_name)
            if 'whmapi1 ' in ' '.join(cmd_list):
                # If result is 1, that denotes success. 0 denotes failure.
                if 'result: 1' not in outs:
                    return False, self.build_result_dict(outs, errs, op_name)
            return True, self.build_result_dict(outs, errs, op_name)
        return False, self.build_result_dict(outs, errs, op_name)

    def change_hostname_cpanel(self, hostname: str, *_: Any,
                               intermediate_result: Dict[str, Any] = None) -> Any:
        """Changes the server hostname via a command for the local operating system

        :param hostname: the new server hostname
        :param intermediate_result: Nydus intermediate result for storing the set-hostname
            result while waiting for the cPanel lock file to clear
        :returns: the result of the set-hostname cPanel call, in Nydus/CLO result format
        """
        LOG.info("LinuxCPanel.change_hostname_cpanel start")
        if intermediate_result is None:
            result = self._run_uapi_command(
                ['/usr/local/cpanel/bin/set_hostname', hostname],
                'set cpanel host',
                'change_hostname_cpanel')
            if not result[0]:
                return result
            intermediate_result = {'set_hostname_result': result}

        # Wait for lock file to go
        if os.path.exists(self.HOSTNAME_CHANGE_LOCK_FILE):
            LOG.info('cPanel hostname change lock file exists (%s), will try again later.',
                     self.HOSTNAME_CHANGE_LOCK_FILE)
            return Retry(intermediate_result=intermediate_result)

        return intermediate_result['set_hostname_result']

    def get_public_ip_cpanel(self, *args: Any) -> Any:
        """Gets the cPanel public IP for this server"""
        op_name = 'get_public_ip_cpanel'
        command = ['wget', '-q', '-O', '-', 'http://www.cpanel.net/showip.cgi']
        exit_code, outs, errs = runCommand(command, 'Get cPanel outbound IP')
        if exit_code == 0:
            public_ip = outs
            return public_ip

        return False, self.build_result_dict(outs, errs, op_name)

    def mark_internal_addresses_cpanel(self, private_addrs: List[str], *args: Any) -> Tuple[bool, Dict]:
        """Marks the server IPs as reserved

        :param private_addrs: A list of IPs to mark
        """
        op_name = 'mark_internal_addresses_cpanel'
        try:
            create_file('/etc/reservedips', '\n'.join(private_addrs))
            create_file('/etc/reservedipreasons', 'Internal datacenter-local addresses not publicly accessible.')
        except OSError as ex:
            return False, self.build_result_dict('', str(ex), op_name)
        return True, self.build_result_dict('Marking internal addresses succeeded', '', op_name)

    def cpanel_enable(self, cpanel_public_ip: str, *args: Any) -> Any:
        """Enables cPanel functionality for this server

        :param cpanel_public_ip: The cPanel public IP for the server
        """
        op_name = 'cpanel_enable'
        LOG.debug("cpanel_public_ip- %s", cpanel_public_ip)
        # update /etc/wwwacct.conf ADDR
        try:
            edit_file_lines('/etc/wwwacct.conf',
                            functools.partial(replace_line,
                                              match='ADDR',
                                              replace='ADDR %s\n' % cpanel_public_ip,
                                              firstword=True))
            # update /var/cpanel/mainip
            try:
                os.remove('/var/cpanel/mainip')
            except OSError:
                pass
            create_file('/var/cpanel/mainip', '%s' % cpanel_public_ip)
            create_file('/var/cpanel/activate/2012-07.v01.EULACPWHM', '')
            create_file('/etc/.whostmgrft', '')
            exit_code, outs, errs = runCommand(['sed', '-i', 's/rpmup_allow_kernel=0/rpmup_allow_kernel=1/g',
                                                '/var/cpanel/cpanel.config'], 'enable automatic kernel updates')
            LOG.debug("Automatic kernel updates result: %s: %s -- %s", exit_code, errs, outs)
            exit_code, outs, errs = runCommand(['sed', '-i',
                                                's/allow_deprecated_accesshash=0/allow_deprecated_accesshash=1/g',
                                                '/var/cpanel/cpanel.config'], 'enable accesshash')
            LOG.debug("Enable accesshash result: %s: %s -- %s", exit_code, errs, outs)
            exit_code, outs, errs = runCommand(['service', 'cpanel', 'restart'], 'restart cpanel')
            LOG.debug("restart cpanel result: %s: %s -- %s", exit_code, errs, outs)

            for feature in ('appconfig',
                            'email_archiving',
                            'email_autodiscovery',
                            'log_archiving',
                            'query_apache_for_nobody_senders',
                            'server_usage_analytics',
                            'servers_usage_analytics',
                            'smtp_restrictions',
                            'trust_x_php_script'):
                create_file(os.path.join('/var/cpanel/activate/features/', feature),
                            """USER=root
MODIFIED=%s
TIMESTAMP=%s
INTERFACE=GUI
""" % (datetime.now().ctime(), datetime.now().ctime()))

            # The following section is an attempt to reduce queueprocd errors.
            exit_code, outs, errs = runCommand(['/scripts/restartsrv_queueprocd'], 'restart queueprocd')
            LOG.debug("Restart queueprocd result: %s: %s -- %s", exit_code, errs, outs)
        except OSError as ex:
            return False, self.build_result_dict('', str(ex), op_name)
        return True, self.build_result_dict('CPanel enabled successfully', '', op_name)

    def cpanel_activate(self, vm_resource: str, *args: Any,
                        intermediate_result: Dict[str, Any] = None) -> Any:
        """Activates cPanel license for this server.

        If another process is running cpkeyclt, this one will wait and retry
        to ensure a successful licensing.

        :param vm_resource: The resource name for the third-party hosting provider
        :param intermediate_result: an intermediate result
        """
        op_name = self.OP_CPANEL_ACTIVATE
        exit_code, outs, errs = runCommand(['/usr/local/cpanel/cpkeyclt'], 'activate license')
        if 'A License check appears to already be running' in outs:
            return RETRY
        if exit_code != 0:
            return False, self.build_result_dict(outs, errs, op_name)

        ops_map = self.CPANEL_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name]

        if ops_map[self.RUN_INSTALLATRON_REPAIR]:
            # We need to re-initialize installatron after getting ne wkey (Only on VMs)
            exit_code, outs, errs = runCommand(
                ['rm', '-fr', '/usr/local/installatron/lib', '/usr/local/installatron/etc/php.ini'],
                'rm installatron php.ini')
            LOG.debug("rm installatron php.ini results: %s: %s -- %s", exit_code, errs, outs)
            exit_code, outs, errs = runCommand(['curl', '-O', 'https://data.installatron.com/installatron-plugin.sh'],
                                               'curl installatron-plugin')
            LOG.debug("curl installatron-plugin result: %s: %s -- %s", exit_code, errs, outs)
            exit_code, outs, errs = runCommand(['chmod', '+x', 'installatron-plugin.sh'], 'chmod plugin')
            LOG.debug("chmod plugin result: %s: %s -- %s", exit_code, errs, outs)
            exit_code, outs, errs = runCommand(['./installatron-plugin.sh', '-f', '--quick'], 'rebuild plugin')
            LOG.debug("rebuild plugin result: %s: %s -- %s", exit_code, errs, outs)

    def set_mysql_password_cpanel(self, *args: Any) -> Any:
        """Generates and sets a random mysql password for cPanel"""
        op_name = 'set_mysql_password_cpanel'
        password = random_password()
        exit_code, outs, errs = runCommand(
            ['/scripts/mysqlpasswd', 'root', password],
            'mysql password', omitString=password)
        if exit_code != 0:
            return False, self.build_result_dict(outs, errs, op_name)

        exit_code, outs, errs = runCommand(['/scripts/mysqlconnectioncheck'], 'restart mysql')
        if exit_code != 0:
            return False, self.build_result_dict(outs, errs, op_name)

    def enable_secure_tmp_cpanel(self, *args: Any) -> Any:
        """Re-secures the /tmp directory"""
        op_name = 'enable_secure_tmp_cpanel'
        exit_code, outs, errs = runCommand(['rm', '-f', '/var/cpanel/version/securetmp_disabled'], 're-secure tmp dir')
        if exit_code != 0:
            return False, self.build_result_dict(outs, errs, op_name)

    def cpanel_prep(self, *args: Any) -> Any:
        """Pre-installs and prepares cPanel on the server"""
        op_name = 'cpanel_prep'

        try:
            self._check_hostname_cpanel()
            self._yum_update_cpanel()
            self._download_cpanel()
            self._exclude_nydus_from_auto_restart_cpanel()
            self._install_cpanel()
            self._install_installatron_cpanel()
            self._config_cpanel()
            self._disable_secure_tmp_cpanel()
        except CPanelException as ex:
            return False, self.build_result_dict(ex.outs, ex.errs, op_name)

        return self.build_result_dict('cpanel_prep complete', '', op_name)

    def _check_hostname_cpanel(self) -> bool:
        """Checks that the server's fully qualified hostname is valid for CPanel

        `hostname` -> "PT-Test.secureserver.net" but `hostname -f` -> "localhost".
        /etc/hostname: pt-test.secureserver.net
        /etc/hosts:
            127.0.0.1 localhost PT-Test.secureserver.net # cloud-controlled; do not change
            10.192.28.50 pt-test.secureserver.net pt-test
          CHANGE: removed the PT-Test on the 127.0.0.1 line.
          `hostname` -> "PT-Test.secureserver.net"
          `hostname -f` -> "pt-test.secureserver.net"
        """
        hostname = socket.gethostbyaddr(socket.gethostname())[0]
        if len(hostname.split('.')) < 3:
            raise CPanelException(
                '',
                'cPanel installation requires a fully-qualified hostname ({} is not enough)'.format(hostname))
        return True

    def _yum_update_cpanel(self) -> bool:
        """Performs a yum update on the server

        :raises CPanelException: If the command fails
        """
        exit_code, outs, errs = self._yum_update()
        if exit_code != 0:
            raise CPanelException(outs, errs)
        return True

    def _download_cpanel(self) -> bool:
        """Downloads cPanel to the server

        :raises CPanelException: If the command fails
        """
        exit_code, outs, errs = runCommand(
            ['wget', 'https://securedownloads.cpanel.net/latest', '-O',
             '/root/cpinstall'],
            'download cpinstall')
        if exit_code != 0:
            raise CPanelException(outs, errs)
        return True

    def _exclude_nydus_from_auto_restart_cpanel(self) -> bool:  # pylint:disable=invalid-name
        """Exclude Nydus services from cPanel's auto restarts.

        cPanel restarts services with outdated dependencies during install. It
        does this very generally, and Nydus services are included and stopped
        in the middle of install, causing terminal failure.

        This method adds Nydus services to the exclusion list so they are not
        restarted.

        See also, on a cPanel system:
            - /usr/local/cpanel/scripts/find_outdated_services
            - /usr/local/cpanel/Cpanel/ProcessCheck/Outdated.pm
        """
        os.makedirs(os.path.dirname(self.AUTO_RESTART_EXCLUSION_FILE), exist_ok=True)
        content = '%s\n' % '\n'.join(self.AUTO_RESTART_EXCLUDE_SERVICES)
        create_file(self.AUTO_RESTART_EXCLUSION_FILE, content)
        return True

    def _install_cpanel(self) -> bool:
        """Installs cPanel on the server

        :raises CPanelException: If the command fails
        """
        exit_code, outs, errs = runCommand(['bash', '/root/cpinstall'], 'install cpanel')
        if exit_code != 0:
            raise CPanelException(outs, errs)
        return True

    def _install_installatron_cpanel(self) -> bool:
        """Installs Installatron plugin on the server

        :raises CPanelException: If the command fails
        """
        exit_code, outs, errs = runCommand(
            ['rpm', '-U', '-h', '-v',
             'http://data.installatron.com/installatron-plugin-cpanel-latest.noarch.rpm'],
            'install installatron')
        if exit_code != 0:
            raise CPanelException(outs, errs)
        return True

    def _config_cpanel(self) -> bool:
        """Adds initial cPanel configuration

        :raises CPanelException: If there is a problem running any commands
        """
        try:
            replace_dict = {'NS2': 'NS2 ns2.secureserver.net\n',
                            'NS': 'NS ns1.secureserver.net\n',
                            'ETHDEV': 'ETHDEV eth0\n',
                            'CONTACTEMAIL': 'CONTACTEMAIL root@cpaneltmp.secureserver.net\n',
                            '^ADDR .*': 'ADDR\n'}
            replace_file_lines_multiple('/etc/wwwacct.conf', replace_dict)
            mainipfile = '/var/cpanel/mainip'
            if os.path.exists(mainipfile):
                os.unlink(mainipfile)
        except OSError as ex:
            raise CPanelException('', str(ex)) from ex
        return True

    def _disable_secure_tmp_cpanel(self) -> bool:
        """Disables noexec on the /tmp directory

        :raises CPanelException: If the command fails
        """
        exit_code, outs, errs = runCommand(['touch', '/var/cpanel/version/securetmp_disabled'], 'disable noexec /tmp')
        if exit_code != 0:
            raise CPanelException(outs, errs)
        return True

    def hulk_whitelist_cpanel(self, from_ip_addr: str, *args: Any) -> Tuple[bool, Dict[str, Any]]:
        """Allow cPanel access from customer IP

        :param from_ip_addr: The IP address from which the customer will access the cPanel instance
        """
        op_name = 'hulk_whitelist_cpanel'
        exit_code, outs, errs = runCommand(['/scripts/cphulkdwhitelist', from_ip_addr], 'cphulk whitelist')
        return exit_code == 0, self.build_result_dict(outs, errs, op_name)

    def get_hash_cpanel(self, *args: Any) -> Any:
        """Set cPanel hash"""

        op_name = 'get_hash_cpanel'
        path = '/root/.accesshash'
        if os.path.exists(path) and os.path.getsize(path) == 0:
            os.unlink(path)
        if not os.path.exists(path):
            exit_code, outs, errs = runCommand(['/usr/local/cpanel/whostmgr/bin/whostmgr', 'setrhash'], 'generate hash')
            if exit_code != 0:
                return False, self.build_result_dict(outs, errs, op_name)

        with open(path, 'r', encoding='utf-8') as hashf:
            cphash = hashf.read()
        return ''.join(cphash.split('\n'))

    def get_api_token_cpanel(self, *args: Any) -> Any:
        """Get cPanel API Token. Revoke any existing nydus-generated tokens before generating a new token."""

        op_name = 'get_api_token_cpanel'
        token_prefix = 'dashboard_generated_'
        token_suffix = ShortUUID().random(length=SHORT_UUID_DEFAULT_LENGTH)
        token_name = token_prefix + token_suffix

        # Retrieve and revoke any existing tokens matching the token prefix
        exit_code, outs, errs = runCommand(['/usr/local/cpanel/bin/whmapi1', '--output=jsonpretty', 'api_token_list'],
                                           'get api tokens')

        if exit_code != 0:
            return False, self.build_result_dict(outs, errs, op_name)

        tokens_json = json.loads(outs)
        tokens = tokens_json.get('data').get('tokens')
        for old_token_name in tokens.keys():
            if old_token_name.startswith(token_prefix):
                # Revoke token
                runCommand(['/usr/local/cpanel/bin/whmapi1', '--output=jsonpretty',
                            'api_token_revoke',
                            'token_name={old_token_name}'.format(old_token_name=old_token_name)],
                           'revoke token')

        exit_code, outs, errs = runCommand(['/usr/local/cpanel/bin/whmapi1', '--output=jsonpretty', 'api_token_create',
                                            "token_name={token_name}".format(token_name=token_name)],
                                           'generate api token')
        if exit_code != 0:
            return False, self.build_result_dict(outs, errs, op_name)

        token_json = json.loads(outs)
        token_data = token_json.get('data')
        if isinstance(token_data, dict):
            token = token_data.get('token')
            if token is not None:
                return self.encrypt(token).decode('utf-8')

        return False, self.build_result_dict('', 'Invalid token json returned from cpanel', op_name)


class CentOSCPanel(CentOS, LinuxCPanel):
    pass


class CentOS6CPanel(CentOS6, CentOSCPanel):  # pylint: disable=too-many-ancestors
    def __init__(self, *args, **kwargs):  # pylint: disable=super-init-not-called
        raise NotImplementedError


class CentOS7CPanel(CentOS7, CentOSCPanel):  # pylint: disable=too-many-ancestors
    pass


class AlmaLinux8CPanel(AlmaLinux8, CentOSCPanel):  # pylint: disable=too-many-ancestors
    pass


class AlmaLinux9CPanel(AlmaLinux9, CentOSCPanel):  # pylint: disable=too-many-ancestors
    pass


class DebianCPanel(Debian, LinuxCPanel):
    def __init__(self, *args, **kwargs):  # pylint: disable=super-init-not-called
        raise NotImplementedError


class Debian8CPanel(Debian8, DebianCPanel):  # pylint: disable=too-many-ancestors
    pass


class Ubuntu1604CPanel(Ubuntu1604, DebianCPanel):  # pylint: disable=too-many-ancestors
    pass

Anon7 - 2022
AnonSec Team