SILENT KILLERPanel

Current Path: > > usr > share > l.v.e-manager > utils


Operation   : Linux premium131.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
Software     : Apache
Server IP    : 162.0.232.56 | Your IP: 216.73.216.111
Domains      : 1034 Domain(s)
Permission   : [ 0755 ]

Files and Folders in: //usr/share/l.v.e-manager/utils

NameTypeSizeLast ModifiedActions
activate File 2353 bytes July 11 2025 17:46:23.
cloudlinux-cli-user.py File 491 bytes July 11 2025 17:46:23.
cloudlinux-selector.py File 654 bytes July 11 2025 17:46:23.
cloudlinux_cli_user.py File 11427 bytes July 11 2025 17:46:23.
cpanel_api.py File 9997 bytes July 11 2025 17:46:23.
libcloudlinux.py File 28400 bytes July 11 2025 17:46:23.
node_wrapper File 268 bytes July 11 2025 17:46:23.
npm_wrapper File 1730 bytes July 11 2025 17:46:23.
python_wrapper File 568 bytes July 11 2025 17:46:23.
set_env_vars.py File 3750 bytes July 11 2025 17:46:23.

Reading File: //usr/share/l.v.e-manager/utils/cpanel_api.py

# coding:utf-8

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import absolute_import
from __future__ import print_function
from lvemanager.helpers import exit_with_error, run_command
import json
from abc import abstractmethod
from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError
import sys

OWNER_ADMIN = 'admin'
OWNER_USER = 'user'
UAPI_IGNORED_ERRORS = ["The event UAPI::LangPHP::php_set_vhost_versions was handled successfully."]

def get_cpanel_api_class(owner):
    """
    Factory method that returns
    certain class depending on
    owner's type
    :param owner: Can be 'admin' or 'user'
    :return:
    """
    if owner == OWNER_ADMIN:
        return CPanelAdminApi()
    return CPanelUserApi()


class CPanelApi(object):
    """
    Abstract method that defines abstract methods
    that must be implemented and common methods of
    the derived classes
    """

    RETURN_JSON = '--output=json'
    ALLOWED_OPERATIONS = {}

    @abstractmethod
    def check_operation_allowed(self, called_method):
        """
        Checks whether operation is allowed to perform
        :param called_method:
        :return:
        """
        raise NotImplementedError('Method check_operation_allowed must be implemented!')

    @abstractmethod
    def prepare_running_command(self, called_method, params, return_json):
        """
        Depending on the passed arguments,
        the method builds running command
        """
        raise NotImplementedError('Method prepare_running_command must be implemented!')

    @abstractmethod
    def parse_result(self, called_method, result):
        """
        Some results are parsed before sending back
        to the called. This method parses it and returns
        transformed result.
        """
        raise NotImplementedError('Method parse_result must be implemented!')

    @staticmethod
    def check_method_allowed(method_name, methods):
        """
        CPanel API contains several methods and
        not all of them if supported by this utility.
        """
        for method_object in methods:
            if method_name == method_object['method']:
                return True
        return False

    def check_for_errors(self, result):
        """
        Checks whether CPanel API returned
        error or not
        :param result:
        :return:
        """
        raise NotImplementedError('Method check_for_errors must be implemented!')

    @staticmethod
    def get_method_parser(method_name, methods):
        """
        Some result must be parsed before
        they are sent back to the caller. This
        method extracts the method that must be executed
        upon the result to transform it.
        """
        for method_object in methods:
            if method_name == method_object['method']:
                return method_object.get('parser', None)

    def get_ignore_errors(self, method_name, methods):
        """
        return ignore error flag for method
        :return:
        """
        for method_object in methods:
            if method_name == method_object['method']:
                return method_object.get('ignore_errors', None)

    @staticmethod
    def parse_vhost_versions(result):
        """
        Method parses the result of the method 'parse_vhost_versions'.
        The host will be inherited when its field 'sys_default'
        inside 'php_version_source' is equal to 1.
        """
        parsed_result = []
        for r in result:
            parsed_result.append({
                'version': r.get('version'),
                'host': r.get('vhost'),
                'php_fpm': True if r.get('php_fpm') else False,
                'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False
            })
        return parsed_result

    def run(self, called_method, params, return_json=True):
        """
        Default method which first checks whether operation
        is allowed then if allowed runs prepared command.
        After receiving the result, it will call parse_result method
        to transform result into desired format.
        """
        if self.check_operation_allowed(called_method):
            prepared_command = self.prepare_running_command(called_method, params, return_json)

            code, output, std_err = run_command(prepared_command, return_full_output=True)
            if code != 0:
                exit_with_error(std_err or 'output of the command: %s\n%s' % (prepared_command, output))

            try:
                result = json.loads(output)
                return self.parse_result(called_method, result)
            except ValueError as e:
                exit_with_error(e)
        else:
            exit_with_error('Not allowed operation: {}'.format(called_method))


class CPanelAdminApi(CPanelApi):
    COMMAND = '/usr/local/cpanel/bin/whmapi1'
    ALLOWED_OPERATIONS = [
        {
            'method': 'php_get_vhost_versions',
            'parser': lambda result: CPanelAdminApi.parse_vhost_versions(result)
        },
        {'method': 'php_get_system_default_version'},
        {'method': 'php_set_vhost_versions'},
        {'method': 'php_get_installed_versions'}
    ]

    def check_operation_allowed(self, called_method):
        return self.check_method_allowed(called_method, self.ALLOWED_OPERATIONS)

    def prepare_running_command(self, called_method, params, return_json):
        transformed_params = {}
        for param in params:
            key, value = param.split('=')
            transformed_params[key] = value
        return transformed_params

    def parse_result(self, called_method, result):
        parser = self.get_method_parser(called_method, self.ALLOWED_OPERATIONS)
        if parser is not None:
            return parser(result)
        return result

    @staticmethod
    def parse_vhost_versions(result):
        parsed_result = []
        for r in result['versions']:
            parsed_result.append({
                'version': r.get('version'),
                'host': r.get('vhost'),
                'user': r.get('account'),
                'php_fpm': True if r.get('php_fpm') else False,
                'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False
            })
        return parsed_result

    def run(self, called_method, params, return_json=True):
        params = self.prepare_running_command(called_method, params, return_json)
        if self.check_operation_allowed(called_method):
            try:
                return self.parse_result(called_method, WhmApiRequest(called_method).with_arguments(**params).call())
            except WhmApiError as e:
                print(e)
                sys.exit(1)
        else:
            exit_with_error('Not allowed operation: {}'.format(called_method))


class CPanelUserApi(CPanelApi):
    COMMAND = '/usr/bin/uapi'
    ALLOWED_OPERATIONS = {
        'LangPHP': [
            {'method': 'php_set_vhost_versions'},
            {
                'method': 'php_get_installed_versions',
                'ignore_errors': True,
            },
            {
                'method': 'php_get_vhost_versions',
                'parser': lambda result: CPanelApi.parse_vhost_versions(result),
                'ignore_errors': True,
            },
            {
                'method': 'php_get_system_default_version',
                'ignore_errors': True,
            }
        ]
    }

    def check_class_allowed(self, class_name):
        """
        CPanel UAPI requires to pass class name and
        only few of classes are supported by this utility.
        """
        return class_name in self.ALLOWED_OPERATIONS.keys()

    def check_operation_allowed(self, called_method):
        """
        Checks whether operation is allowed or not.
        By CPanel UAPI supports several operations
        but only few of them can be called via this utility.
        """
        class_name, method_name = self.extract_class_and_method(called_method)
        if self.check_class_allowed(class_name):
            return self.check_method_allowed(method_name, self.ALLOWED_OPERATIONS[class_name])
        return False

    @staticmethod
    def extract_class_and_method(called_method):
        try:
            class_name, method_name = called_method.split('::')
            return class_name, method_name
        except ValueError:
            exit_with_error('Invalid method is passed: {}'.format(called_method))

    def prepare_running_command(self, called_method, params, return_json):
        class_name, method_name = self.extract_class_and_method(called_method)
        params = [self.COMMAND, class_name, method_name] + params
        if return_json:
            params.append(self.RETURN_JSON)
        return params

    def parse_result(self, called_method, result):
        class_name, method_name = self.extract_class_and_method(called_method)
        ignore_errors = self.get_ignore_errors(method_name, self.ALLOWED_OPERATIONS[class_name])
        self.check_for_errors(result, ignore_errors)
        parser = self.get_method_parser(method_name, self.ALLOWED_OPERATIONS[class_name])
        if parser is not None:
            return parser(result['result']['data'])
        return result['result']['data']

    def check_for_errors(self, result, ignore_errors = False):
        errors = result['result'].get('errors')
        if not errors:
            return
        for error in errors:
            # TODO: The next line is workaround of cPanel issue CPANEL-35122
            # see https://support.cpanel.net/hc/en-us/articles/1500004317181-PHP-Selector-change-to-CloudLinux-PHP-version-reports-a-false-error
            if error in UAPI_IGNORED_ERRORS:
                continue
            exit_with_error(' '.join(errors), ignore_errors=ignore_errors)

SILENT KILLER Tool