File: //lib/python3.6/site-packages/cloudinit/config/cc_source_address.py
import os
from functools import partial
from cloudinit import templater
from cloudinit import util
from cloudinit.config.cc_yum_add_repo import _format_repo_value
from cloudinit.config.cc_apt_configure import get_release
from cloudinit import log as logging
LOG = logging.getLogger(__name__)
def make_repo_content(repo_id, repo_dict, k_sequence=None):
blob_lines = []
if not k_sequence:
k_sequence = repo_dict.keys()
for k in k_sequence:
if k in repo_dict:
blob_lines.append('%s=%s' % (k, _format_repo_value(repo_dict[k])))
blob_lines.insert(0, '[%s]' % repo_id)
return '\n'.join(blob_lines)
def _gen_yum_base_repo(source_url, cname, major_version, cloud):
issue = cname.lower()
def construct_repo_url(repo_url, issue, repo_id):
repo_url.rstrip('/')
return "%s/%s/$releasever/%s/$basearch/" % (repo_url, issue, repo_id)
def construct_gpg_key(repo_url, cname, major_version, rpmgpgkey_path=None):
repo_url.rstrip('/')
if not rpmgpgkey_path:
return "%s/%s/RPM-GPG-KEY-%s-%s" % (repo_url,
issue, cname, major_version)
else:
return "file:///%s" % rpmgpgkey_path.strip('/')
repo_full_content = []
repo_ids = ('base', 'updates', 'extras')
k_sequence = (
'name',
'enabled',
'failovermethod',
'baseurl',
'gpgcheck',
'gpgkey')
for repo_id in repo_ids:
repo_path_id = 'os' if repo_id == 'base' else repo_id
repo_info_dict = {
'name': '%s-$releasever' % cname,
'enabled': '1',
'failovermethod': 'priority',
'baseurl': [construct_repo_url(url, issue, repo_path_id)
for url in source_url],
'gpgcheck': '1',
'gpgkey': [construct_gpg_key(url, cname, major_version, cloud.distro.rpmgpgkey_path)
for url in source_url]
}
repo_full_content.append(
make_repo_content(
repo_id,
repo_info_dict,
k_sequence))
return ('%s-Base.repo' % cname, '\n\n'.join(repo_full_content))
def _gen_yum_epel_repo(source_url, issue, major_version):
def construct_repo_url(repo_url, major_version):
repo_url.rstrip('/')
return '%s/epel/%s/$basearch' % (repo_url, major_version)
def construct_gpg_key(repo_url, major_version):
repo_url.rstrip('/')
return '%s/epel/RPM-GPG-KEY-EPEL-%s' % (repo_url, major_version)
k_sequence = (
'name',
'enabled',
'failovermethod',
'baseurl',
'gpgcheck',
'gpgkey')
epel_dict = {
'name': 'Extra Packages for Enterprise Linux %s - $basearch' % major_version,
'enabled': '1',
'failovermethod': 'priority',
'baseurl': [construct_repo_url(url, major_version)
for url in source_url],
'gpgcheck': '0',
'gpgkey': [construct_gpg_key(url, major_version)
for url in source_url]
}
return ('epel.repo', make_repo_content('epel', epel_dict, k_sequence))
def _gen_yum_arm_base_repo(name, cfg, cloud, log, _args):
fn_contents = []
base_contents = []
if cloud.distro.name in ("centos"):
base_contents = _gen_yum_tmpl_repo(
name,
cfg,
cloud,
log,
(cloud.distro.cname +
"-Base.repo",
"sources.list.centos.arm.base"))
fn_contents.extend(base_contents)
return fn_contents
def gen_yum_repo(name, cfg, cloud, log, _args):
source_url = cloud.datasource.get_source_address()
repo_base_path = util.get_cfg_option_str(cfg, 'yum_repo_dir',
'/etc/yum.repos.d/')
issue = cloud.distro.name
cname = cloud.distro.cname
major_version = cloud.distro.major_version
if not major_version:
log.warning("can't get distribution major_version version, skip it")
return
fn_contents = []
if cloud.distro.get_primary_arch() in ("aarch64", "arm64"):
fn_contents.extend(
_gen_yum_arm_base_repo(
name, cfg, cloud, log, _args))
else:
base_fn, base_blob = _gen_yum_base_repo(
source_url, cname, major_version, cloud)
fn_contents.append((os.path.join(repo_base_path, base_fn), base_blob))
epel_fn, epel_blob = _gen_yum_epel_repo(source_url, issue, major_version)
fn_contents.append((os.path.join(repo_base_path, epel_fn), epel_blob))
return fn_contents
def _gen_yum_tmpl_repo(name, cfg, cloud, log, _args):
repo_base_path = util.get_cfg_option_str(cfg, 'yum_repo_dir',
'/etc/yum.repos.d/')
template_fn = cloud.get_template_filename(_args[1])
if not template_fn:
log.warning("No template found, not rendering.")
return
template_functor = partial(templater.render_from_file, template_fn)
fn_contents = []
for (k, v) in enumerate(cloud.datasource.get_source_address()):
target_source_path = os.path.join(repo_base_path, _args[0])
mirror_info = {
'mirror': v.rstrip('/'),
'major_version': cloud.distro.major_version,
'repo_name': _args[0]
}
fn_contents.append((target_source_path, template_functor(mirror_info)))
return fn_contents
def gen_aliyun_yum_repo(name, cfg, cloud, log, _args):
fn_contents = []
fn_contents = _gen_yum_tmpl_repo(
name, cfg, cloud, log, ("AliYun.repo", "sources.list.aliyun"))
epel_contents = _gen_yum_tmpl_repo(
name, cfg, cloud, log, ("epel.repo", "sources.list.aliyun.epel"))
fn_contents.extend(epel_contents)
return fn_contents
def gen_apt_source(name, cfg, cloud, log, _args):
try:
codename = get_release()
except BaseException:
log.warn("cant't get distribution codename from lsb_release")
return
template_fn = cloud.get_template_filename(
'sources.list.%s' %
(cloud.distro.name))
if not template_fn:
log.warn("No template found, not rendering.")
return
template_functor = partial(templater.render_from_file, template_fn)
fn_contents = []
for (k, v) in enumerate(cloud.datasource.get_source_address()):
target_source_path = '/etc/apt/sources.list'
if cloud.distro.name == "ubuntu" and cloud.distro.get_primary_arch() in ("aarch64",
"arm64"):
mirror_url = v.rstrip('/') + '/' + \
cloud.distro.name + '-ports' + '/'
else:
mirror_url = v.rstrip('/') + '/' + cloud.distro.name + '/'
mirror_info = {
'mirror': mirror_url,
'codename': codename
}
mirror_info['security'] = mirror_info['mirror'].rstrip('/')
fn_contents.append((target_source_path, template_functor(mirror_info)))
return fn_contents
def gen_sles_repo(name, cfg, cloud, log, _args):
cname = cloud.distro.cname
release = cloud.distro.major_version
sp_version = cloud.distro.sp_version
if not release:
log.warn("can't get distribution release")
return
def _gen_sles_repo(source_name, repo_type, baseurl):
repo_name = '{cname}{release}'.format(cname=cname, release=release)
if sp_version:
repo_name += '-SP%s' % sp_version
if repo_type:
repo_name += '-%s' % repo_type
repo_url = '{baseurl}/{cname}/{repo_name}/sle-{release}-x86_64/'.format(
baseurl=baseurl, cname=cname, repo_name=repo_name, release=release)
repo_name += '-%s' % source_name
repo_info = {
'name': repo_name,
'enabled': 1,
'autorefresh': 1,
'baseurl': repo_url,
}
k_sequence = ('name', 'enabled', 'autorefresh', 'baseurl')
return (
'%s.repo' %
repo_name,
make_repo_content(
repo_name,
repo_info,
k_sequence))
fn_contents = []
zypper_repo_dir = '/etc/zypp/repos.d/'
for (source_name, base_url) in enumerate(
cloud.datasource.get_source_address()):
for repo_type in ('', 'Updates'):
repo_fn, repo_blob = _gen_sles_repo(
source_name, repo_type, base_url)
fn_contents.append(
(os.path.join(
zypper_repo_dir,
repo_fn),
repo_blob))
return fn_contents
def gen_opensuse_repo(name, cfg, cloud, log, _args):
cname = cloud.distro.cname
release = cloud.distro.release
if not release:
log.warn("can't get distribution release")
return
def _gen_opensuse_repo(repo_type, base_url):
def construct_repo_url(url, repo_type):
if repo_type in ('Oss', 'Non-Oss'):
tail = 'distribution/leap/%s/repo/%s' % (
release, repo_type.lower())
elif repo_type in ('Update-Oss',):
tail = 'update/leap/%s/oss' % release
elif repo_type in ('Update-Non-Oss'):
tail = 'update/leap/%s/non-oss' % release
return url.rstrip('/') + '/' + cname.lower() + '/' + tail + '/'
repo_name = '%s-%s-%s' % (cname, release, repo_type)
repo_info = {
'name': repo_name,
'enabled': '1',
'autorefresh': '1',
'baseurl': construct_repo_url(base_url, repo_type),
}
k_sequence = ('name', 'enabled', 'autorefresh', 'baseurl')
return (
'%s.repo' %
repo_name,
make_repo_content(
repo_name,
repo_info,
k_sequence))
fn_contents = []
zypper_repo_dir = '/etc/zypp/repos.d/'
for (_, base_url) in enumerate(cloud.datasource.get_source_address()):
for repo_type in ('Oss', 'Non-Oss', 'Update-Oss', 'Update-Non-Oss'):
repo_fn, repo_blob = _gen_opensuse_repo(repo_type, base_url)
fn_contents.append(
(os.path.join(
zypper_repo_dir,
repo_fn),
repo_blob))
return fn_contents
def gen_gentoo_repo(name, cfg, cloud, log, _args):
conf_fn = '/etc/portage/make.conf'
mirror_urls = ' '.join(
(i.rstrip('/') + '/gentoo/' for i in cloud.datasource.get_source_address()))
out = []
if os.path.exists(conf_fn):
with open(conf_fn) as f:
for i in f.readlines():
if not i.strip().startswith('GENTOO_MIRRORS'):
out.append(i)
out.append("GENTOO_MIRRORS=\"%s\"" % mirror_urls)
return [(conf_fn, ''.join(out))]
def write_helper(fn_contents):
for fn, content in fn_contents:
try:
util.write_file(fn, content)
except BaseException:
LOG.warn("Faild to write %s", fn)
def handle(name, cfg, cloud, log, _args):
if not cloud.datasource.get_source_address() or \
not isinstance(cloud.datasource.get_source_address(), list):
log.debug("there is not source_address")
return
distro_name = cloud.distro.name
if distro_name in ('aliyun') and cloud.distro.major_version in ('2', '3'):
functor = gen_aliyun_yum_repo
elif (distro_name in ("centos") and cloud.distro.major_version in ('8')) or distro_name in ("anolis"):
log.debug("centos 8 or anolis os 's repo base in image")
return
elif distro_name in ('rhel', 'centos', 'aliyun'):
functor = gen_yum_repo
elif distro_name in ('ubuntu', 'debian'):
functor = gen_apt_source
elif distro_name in ('sles',):
functor = gen_sles_repo
elif distro_name in ('opensuse',):
functor = gen_opensuse_repo
elif distro_name in ('gentoo',):
functor = gen_gentoo_repo
else:
log.debug("not support distribution %s" % distro_name)
return
try:
fn_contents = functor(name, cfg, cloud, log, _args)
except BaseException:
LOG.warn("Faild to generate source address")
else:
write_helper(fn_contents)