HEX
Server: nginx/1.22.0
System: Linux iZuf6jdxbygmf6cco977lcZ 5.10.84-10.4.al8.x86_64 #1 SMP Tue Apr 12 12:31:07 CST 2022 x86_64
User: root (0)
PHP: 7.4.29
Disabled: passthru,exec,system,chroot,chgrp,chown,shell_exec,proc_open,proc_get_status,ini_alter,ini_restore,dl,readlink,symlink,popepassthru,stream_socket_server,fsocket,popen
Upload Files
File: //usr/local/aegis/PythonLoaderTemp/third_party/aegis_checker/aegis_checker_entry.py
# -*- coding: utf-8 -*-
import json
import os
import shutil
import socket
import sys
import logging
import argparse
import time
import base64
import zlib

# add current dir into search path

from aegis_checker.info import check_info, catch_aegis_stack, check_network, catch_aegis_packet, collect_aegis_log, check_result, check_issue_info
from install import check_install
from offline import check_offline
from performance import check_performance
from common.common_path import get_log_dir
from common.aegis_client_log_parser import AegisClientLogParser
from common.common_func import PrintCostTime, clear_dir
from common.print_log import log_warning, log_info
from common.network_util import get_local_ip, http_post
from common.string_util import data_to_hex_string
from common.platform_info import is_windows
from aes import AESModeOfOperationECB


def get_aegis_checker_ver():
    current_dir_path = os.path.dirname(os.path.abspath(__file__))
    version_path = os.path.join(current_dir_path, "version")
    with open(version_path) as f:
        return f.read()


def zip_log():
    log_dir = get_log_dir()
    current_dir_path = os.path.dirname(os.path.abspath(__file__))
    save_dir = os.path.join(current_dir_path, "output")
    clear_dir(save_dir)

    date = time.strftime("%Y.%m.%d.%H.%M", time.localtime(time.time()))
    local_ip = get_local_ip()
    log_info("local ip is %s" % local_ip)
    save_path = os.path.join(save_dir, "aegis_checker_log_" + date + "_" + local_ip)
    shutil.make_archive(save_path, 'zip', log_dir)
    print("")
    log_info("please upload aegis checker log for analysis, log path is %s" % (save_path + ".zip"))
    return save_path + ".zip"


def key_to_passphrase(key):
    KEY_SIZE = 32
    passphrase = ['0'] * KEY_SIZE
    for i in range(KEY_SIZE):
        passphrase[i] = key[i % len(key)]
    return "".join(passphrase)


def encrypt_string(string, key):
    """

    :param encrypt_str:
    :param key: the actual internal key will be looped if not long enough
    :return:
    """
    passphrase = key_to_passphrase(key)
    # print("passphrase is %s" % passphrase)
    aes = AESModeOfOperationECB(passphrase)

    BLOCK_SIZE = 16
    cipher_text = [chr(0)] * BLOCK_SIZE
    encrypt_str = ""
    index = 0

    # ECB mode decrypt 16 chars every time, if there is no enough char, append 0 for it
    while index < len(string):
        for i in range(BLOCK_SIZE):
            if index < len(string):
                cipher_text[i] = string[index]
                index += 1
            else:
                cipher_text[i] = chr(0)

        encrypt_str += aes.encrypt(cipher_text)

    return encrypt_str


def post_result(uuid, version, update_domains, log_zip_path, cmd_idx):
    """
    https post result to https://update.aegis.aliyun.com/update
    :return: str, log zip download url, if fail, return none
    """
    log_zip_data = ""
    if os.path.exists(log_zip_path):
        with open(log_zip_path, "rb") as f:
            log_zip_data = f.read()
            log_zip_data = base64.b64encode(log_zip_data)

    local_ip = get_local_ip()
    host_name = socket.gethostname()
    solution_codes = check_result.get_all_solution_codes()
    post_dict = {
        "type": "aegis_checker_result",
        "base64": 1,
        "uuid": uuid,
        "cur_version": version,
        "client_ip": local_ip,
        "host_name": host_name,
        "cmd_idx": cmd_idx,
        "issue": check_issue_info.get_issue(),
        "root_cause_code": check_result.get_all_root_cause_codes(),
        "root_cause": check_result.get_all_root_cause_descriptions(),
        "solution_code": solution_codes,
        "solution": check_result.get_all_solution_descriptions(solution_codes),
        "logfile": os.path.basename(log_zip_path),
        "logdata": log_zip_data
    }

    encrypt_data = encrypt_string(json.dumps(post_dict), base64.b64decode("YWVnaXNfY2hlY2tlcl9mb3JfcG9zdA=="))
    encrypt_str = data_to_hex_string(encrypt_data)
    post_wrapper_dict = {
        "version": 3,
        "data": encrypt_str
    }

    post_dict["logdata"] = ""
    logging.info(json.dumps(post_dict))

    for update_domain in update_domains:
        url = "https://%s/update" % update_domain
        ret = http_post(url, post_wrapper_dict)

        if ret is None:
            log_warning("post result to aegis server fail")
            continue

        result_dict = json.loads(ret)
        if "result" in result_dict and result_dict["result"] == 1 and "data" in result_dict:
            log_info("post result success, log url is " + result_dict["data"])
            return result_dict["data"]
        else:
            log_warning("post result fail, server response : " + str(ret))
            break
    else:
        log_warning("post result to aegis server fail")


def do_check(params=None):
    """

    :param params: {
        "uuid": "30041741-a78a-4637-9826-63bcf80cb29e",
        "cmd_idx": "b84ff3fb-25d4-474a-9dd7-cfd71035ad78",
        "issue": "uninstall_fail",
        "mode": 1,
        "jsrv_domain": [],
        "update_domain": []
    }
    :return:
    """
    # clear old logs
    log_dir = get_log_dir()
    clear_dir(log_dir)

    log_path = os.path.join(log_dir, "aegis_checker.log")
    logging.basicConfig(filename=log_path, filemode="w",
                        format='%(asctime)s [%(filename)s][%(levelname)s] %(message)s', level=logging.DEBUG)
    print_time = PrintCostTime()
    json_dict = {}
    cmd_index = ""

    if params:
        logging.debug(params)
        json_dict = json.loads(params)
        if "cmd_idx" in json_dict:
            cmd_index = json_dict["cmd_idx"]
    logging.debug("cmd_idx is %s", cmd_index)
    check_issue_info.set_check_issue_info(json_dict)
    issue = check_issue_info.get_issue()

    aegis_info = {
        "aegis_ver": None,
        "is_yun_dun_alive": None,
        "yun_dun_pid": None
    }

    checker_ver = get_aegis_checker_ver()
    log_info("aegis checker version is %s" % checker_ver)

    log_parser = AegisClientLogParser()
    check_info.check(log_parser, aegis_info)

    if issue == check_issue_info.ISSUE_INSTALL_FAIL or issue == check_issue_info.ISSUE_OTHER:
        logging.info("begin to check install issue")
        check_install.check(aegis_info, log_parser)
    check_offline.check(log_parser)
    check_performance.check(log_parser)

    # log parser must start after check_info.check(), because it need set aegis version first
    log_parser.start(aegis_info)

    if "jsrv_domain" in json_dict and json_dict["jsrv_domain"]:
        jsrv_domains = json_dict["jsrv_domain"]
    else:
        jsrv_domains = [
            "jsrv.aegis.aliyun.com",
            "jsrv2.aegis.aliyun.com",
            "jsrv3.aegis.aliyun.com",
            "jsrv4.aegis.aliyun.com",
            "jsrv5.aegis.aliyun.com"
        ]

    if "update_domain" in json_dict and json_dict["update_domain"]:
        update_domains = json_dict["update_domain"]
    else:
        update_domains = [
            "update.aegis.aliyun.com",
            "update2.aegis.aliyun.com",
            "update3.aegis.aliyun.com",
            "update4.aegis.aliyun.com",
            "update5.aegis.aliyun.com",
            "aegis.alicdn.com",
        ]

    ips = check_network.check_aegis_network(jsrv_domains, update_domains)

    if "mode" in json_dict and json_dict["mode"] == 3:
        max_time = 5
        if "max_time" in json_dict:
            max_time = json_dict["max_time"]

        stack_catcher = catch_aegis_stack.AegisStackCatcher(max_time)
        stack_catcher.start()

        packet_catcher = catch_aegis_packet.AegisPacketCatcher(ips, max_time)
        packet_catcher.start()

        stack_catcher.join()
        packet_catcher.join()

    check_result.print_result()
    collect_aegis_log.collect_aegis_log()
    log_path = zip_log()

    log_url = post_result(check_issue_info.get_uuid(), checker_ver, update_domains, log_path, cmd_index)
    if not cmd_index and is_windows():
        os.system("pause")
    return log_url


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-b", "--base64", help="base64 param json, default is none", default=None)
    args = parser.parse_args()

    if len(sys.argv) <= 1:
        parser.print_help()
        # exit(-1)
    return args


def main():
    args = parse_args()
    decode_str = ""
    if args.base64:
        decode_str = base64.b64decode(args.base64)

    do_check(decode_str)


if __name__ == '__main__':
    main()