HEX
Server: nginx/1.22.0
System: Linux iZuf6jdxbygmf6cco977lcZ 5.10.84-10.4.al8.x86_64 #1 SMP Tue Apr 12 12:31:07 CST 2022 x86_64
User: root (0)
PHP: 7.4.29
Disabled: passthru,exec,system,chroot,chgrp,chown,shell_exec,proc_open,proc_get_status,ini_alter,ini_restore,dl,readlink,symlink,popepassthru,stream_socket_server,fsocket,popen
Upload Files
File: //lib/python3.6/site-packages/cloudinit/config/cc_source_address.py
import os
from functools import partial

from cloudinit import templater
from cloudinit import util
from cloudinit.config.cc_yum_add_repo import _format_repo_value
from cloudinit.config.cc_apt_configure import get_release
from cloudinit import log as logging

LOG = logging.getLogger(__name__)


def make_repo_content(repo_id, repo_dict, k_sequence=None):
    blob_lines = []
    if not k_sequence:
        k_sequence = repo_dict.keys()

    for k in k_sequence:
        if k in repo_dict:
            blob_lines.append('%s=%s' % (k, _format_repo_value(repo_dict[k])))
    blob_lines.insert(0, '[%s]' % repo_id)
    return '\n'.join(blob_lines)


def _gen_yum_base_repo(source_url, cname, major_version, cloud):
    issue = cname.lower()

    def construct_repo_url(repo_url, issue, repo_id):
        repo_url.rstrip('/')
        return "%s/%s/$releasever/%s/$basearch/" % (repo_url, issue, repo_id)

    def construct_gpg_key(repo_url, cname, major_version, rpmgpgkey_path=None):
        repo_url.rstrip('/')
        if not rpmgpgkey_path:
            return "%s/%s/RPM-GPG-KEY-%s-%s" % (repo_url,
                                                issue, cname, major_version)
        else:
            return "file:///%s" % rpmgpgkey_path.strip('/')

    repo_full_content = []
    repo_ids = ('base', 'updates', 'extras')
    k_sequence = (
        'name',
        'enabled',
        'failovermethod',
        'baseurl',
        'gpgcheck',
        'gpgkey')
    for repo_id in repo_ids:
        repo_path_id = 'os' if repo_id == 'base' else repo_id
        repo_info_dict = {
            'name': '%s-$releasever' % cname,
            'enabled': '1',
            'failovermethod': 'priority',
            'baseurl': [construct_repo_url(url, issue, repo_path_id)
                        for url in source_url],
            'gpgcheck': '1',
            'gpgkey': [construct_gpg_key(url, cname, major_version, cloud.distro.rpmgpgkey_path)
                        for url in source_url]
        }
        repo_full_content.append(
            make_repo_content(
                repo_id,
                repo_info_dict,
                k_sequence))
    return ('%s-Base.repo' % cname, '\n\n'.join(repo_full_content))


def _gen_yum_epel_repo(source_url, issue, major_version):
    def construct_repo_url(repo_url, major_version):
        repo_url.rstrip('/')
        return '%s/epel/%s/$basearch' % (repo_url, major_version)

    def construct_gpg_key(repo_url, major_version):
        repo_url.rstrip('/')
        return '%s/epel/RPM-GPG-KEY-EPEL-%s' % (repo_url, major_version)

    k_sequence = (
        'name',
        'enabled',
        'failovermethod',
        'baseurl',
        'gpgcheck',
        'gpgkey')
    epel_dict = {
        'name': 'Extra Packages for Enterprise Linux %s - $basearch' % major_version,
        'enabled': '1',
        'failovermethod': 'priority',
        'baseurl': [construct_repo_url(url, major_version)
                    for url in source_url],
        'gpgcheck': '0',
        'gpgkey': [construct_gpg_key(url, major_version)
                   for url in source_url]
    }
    return ('epel.repo', make_repo_content('epel', epel_dict, k_sequence))


def _gen_yum_arm_base_repo(name, cfg, cloud, log, _args):
    fn_contents = []
    base_contents = []
    if cloud.distro.name in ("centos"):
        base_contents = _gen_yum_tmpl_repo(
            name,
            cfg,
            cloud,
            log,
            (cloud.distro.cname +
             "-Base.repo",
             "sources.list.centos.arm.base"))
    fn_contents.extend(base_contents)
    return fn_contents


def gen_yum_repo(name, cfg, cloud, log, _args):
    source_url = cloud.datasource.get_source_address()
    repo_base_path = util.get_cfg_option_str(cfg, 'yum_repo_dir',
                                             '/etc/yum.repos.d/')
    issue = cloud.distro.name
    cname = cloud.distro.cname
    major_version = cloud.distro.major_version
    if not major_version:
        log.warning("can't get distribution major_version version, skip it")
        return

    fn_contents = []
    if cloud.distro.get_primary_arch() in ("aarch64", "arm64"):
        fn_contents.extend(
            _gen_yum_arm_base_repo(
                name, cfg, cloud, log, _args))
    else:
        base_fn, base_blob = _gen_yum_base_repo(
            source_url, cname, major_version, cloud)
        fn_contents.append((os.path.join(repo_base_path, base_fn), base_blob))

    epel_fn, epel_blob = _gen_yum_epel_repo(source_url, issue, major_version)
    fn_contents.append((os.path.join(repo_base_path, epel_fn), epel_blob))
    return fn_contents


def _gen_yum_tmpl_repo(name, cfg, cloud, log, _args):
    repo_base_path = util.get_cfg_option_str(cfg, 'yum_repo_dir',
                                             '/etc/yum.repos.d/')
    template_fn = cloud.get_template_filename(_args[1])
    if not template_fn:
        log.warning("No template found, not rendering.")
        return
    template_functor = partial(templater.render_from_file, template_fn)

    fn_contents = []
    for (k, v) in enumerate(cloud.datasource.get_source_address()):
        target_source_path = os.path.join(repo_base_path, _args[0])
        mirror_info = {
            'mirror': v.rstrip('/'),
            'major_version': cloud.distro.major_version,
            'repo_name': _args[0]
        }
        fn_contents.append((target_source_path, template_functor(mirror_info)))
    return fn_contents


def gen_aliyun_yum_repo(name, cfg, cloud, log, _args):
    fn_contents = []
    fn_contents = _gen_yum_tmpl_repo(
        name, cfg, cloud, log, ("AliYun.repo", "sources.list.aliyun"))

    epel_contents = _gen_yum_tmpl_repo(
        name, cfg, cloud, log, ("epel.repo", "sources.list.aliyun.epel"))
    fn_contents.extend(epel_contents)
    return fn_contents


def gen_apt_source(name, cfg, cloud, log, _args):
    try:
        codename = get_release()
    except BaseException:
        log.warn("cant't get distribution codename from lsb_release")
        return

    template_fn = cloud.get_template_filename(
        'sources.list.%s' %
        (cloud.distro.name))
    if not template_fn:
        log.warn("No template found, not rendering.")
        return
    template_functor = partial(templater.render_from_file, template_fn)

    fn_contents = []
    for (k, v) in enumerate(cloud.datasource.get_source_address()):
        target_source_path = '/etc/apt/sources.list'
        if cloud.distro.name == "ubuntu" and cloud.distro.get_primary_arch() in ("aarch64",
                                                                                 "arm64"):
            mirror_url = v.rstrip('/') + '/' + \
                cloud.distro.name + '-ports' + '/'
        else:
            mirror_url = v.rstrip('/') + '/' + cloud.distro.name + '/'
        mirror_info = {
            'mirror': mirror_url,
            'codename': codename
        }
        mirror_info['security'] = mirror_info['mirror'].rstrip('/')
        fn_contents.append((target_source_path, template_functor(mirror_info)))
    return fn_contents


def gen_sles_repo(name, cfg, cloud, log, _args):
    cname = cloud.distro.cname
    release = cloud.distro.major_version
    sp_version = cloud.distro.sp_version

    if not release:
        log.warn("can't get distribution release")
        return

    def _gen_sles_repo(source_name, repo_type, baseurl):
        repo_name = '{cname}{release}'.format(cname=cname, release=release)
        if sp_version:
            repo_name += '-SP%s' % sp_version
        if repo_type:
            repo_name += '-%s' % repo_type
        repo_url = '{baseurl}/{cname}/{repo_name}/sle-{release}-x86_64/'.format(
            baseurl=baseurl, cname=cname, repo_name=repo_name, release=release)
        repo_name += '-%s' % source_name
        repo_info = {
            'name': repo_name,
            'enabled': 1,
            'autorefresh': 1,
            'baseurl': repo_url,
        }
        k_sequence = ('name', 'enabled', 'autorefresh', 'baseurl')
        return (
            '%s.repo' %
            repo_name,
            make_repo_content(
                repo_name,
                repo_info,
                k_sequence))

    fn_contents = []
    zypper_repo_dir = '/etc/zypp/repos.d/'
    for (source_name, base_url) in enumerate(
            cloud.datasource.get_source_address()):
        for repo_type in ('', 'Updates'):
            repo_fn, repo_blob = _gen_sles_repo(
                source_name, repo_type, base_url)
            fn_contents.append(
                (os.path.join(
                    zypper_repo_dir,
                    repo_fn),
                    repo_blob))
    return fn_contents


def gen_opensuse_repo(name, cfg, cloud, log, _args):
    cname = cloud.distro.cname
    release = cloud.distro.release
    if not release:
        log.warn("can't get distribution release")
        return

    def _gen_opensuse_repo(repo_type, base_url):

        def construct_repo_url(url, repo_type):
            if repo_type in ('Oss', 'Non-Oss'):
                tail = 'distribution/leap/%s/repo/%s' % (
                    release, repo_type.lower())
            elif repo_type in ('Update-Oss',):
                tail = 'update/leap/%s/oss' % release
            elif repo_type in ('Update-Non-Oss'):
                tail = 'update/leap/%s/non-oss' % release
            return url.rstrip('/') + '/' + cname.lower() + '/' + tail + '/'

        repo_name = '%s-%s-%s' % (cname, release, repo_type)
        repo_info = {
            'name': repo_name,
            'enabled': '1',
            'autorefresh': '1',
            'baseurl': construct_repo_url(base_url, repo_type),
        }
        k_sequence = ('name', 'enabled', 'autorefresh', 'baseurl')
        return (
            '%s.repo' %
            repo_name,
            make_repo_content(
                repo_name,
                repo_info,
                k_sequence))

    fn_contents = []
    zypper_repo_dir = '/etc/zypp/repos.d/'
    for (_, base_url) in enumerate(cloud.datasource.get_source_address()):
        for repo_type in ('Oss', 'Non-Oss', 'Update-Oss', 'Update-Non-Oss'):
            repo_fn, repo_blob = _gen_opensuse_repo(repo_type, base_url)
            fn_contents.append(
                (os.path.join(
                    zypper_repo_dir,
                    repo_fn),
                    repo_blob))
    return fn_contents


def gen_gentoo_repo(name, cfg, cloud, log, _args):
    conf_fn = '/etc/portage/make.conf'
    mirror_urls = ' '.join(
        (i.rstrip('/') + '/gentoo/' for i in cloud.datasource.get_source_address()))
    out = []
    if os.path.exists(conf_fn):
        with open(conf_fn) as f:
            for i in f.readlines():
                if not i.strip().startswith('GENTOO_MIRRORS'):
                    out.append(i)
    out.append("GENTOO_MIRRORS=\"%s\"" % mirror_urls)
    return [(conf_fn, ''.join(out))]


def write_helper(fn_contents):
    for fn, content in fn_contents:
        try:
            util.write_file(fn, content)
        except BaseException:
            LOG.warn("Faild to write %s", fn)


def handle(name, cfg, cloud, log, _args):
    if not cloud.datasource.get_source_address() or \
            not isinstance(cloud.datasource.get_source_address(), list):
        log.debug("there is not source_address")
        return

    distro_name = cloud.distro.name
    if distro_name in ('aliyun') and cloud.distro.major_version in ('2', '3'):
        functor = gen_aliyun_yum_repo
    elif (distro_name in ("centos") and cloud.distro.major_version in ('8')) or distro_name in ("anolis"):
        log.debug("centos 8 or anolis os 's repo base in image")
        return
    elif distro_name in ('rhel', 'centos', 'aliyun'):
        functor = gen_yum_repo
    elif distro_name in ('ubuntu', 'debian'):
        functor = gen_apt_source
    elif distro_name in ('sles',):
        functor = gen_sles_repo
    elif distro_name in ('opensuse',):
        functor = gen_opensuse_repo
    elif distro_name in ('gentoo',):
        functor = gen_gentoo_repo
    else:
        log.debug("not support distribution %s" % distro_name)
        return

    try:
        fn_contents = functor(name, cfg, cloud, log, _args)
    except BaseException:
        LOG.warn("Faild to generate source address")
    else:
        write_helper(fn_contents)