Source code for kiwi.solver.repository.base

# Copyright (c) 2015 SUSE Linux GmbH.  All rights reserved.
#
# This file is part of kiwi.
#
# kiwi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# kiwi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with kiwi.  If not, see <http://www.gnu.org/licenses/>
#
from base64 import b64encode
from urllib.request import urlopen
from urllib.request import Request
from kiwi.utils.temporary import Temporary
from lxml import etree
import random
import glob
import os
import re

# project
import kiwi.defaults as defaults

from kiwi.exceptions import KiwiUriOpenError
from kiwi.path import Path
from kiwi.command import Command
from kiwi.defaults import Defaults


[docs] class SolverRepositoryBase: """ **Base class interface for SAT solvable creation.** :param object uri: Instance of :class:`Uri` :param string user: User name for uri authentication :param string secret: Secret token for uri authentication """ def __init__(self, uri, user=None, secret=None): self.uri = uri # check if the URI string contains credentials and # extract/trim them from the uri object. The urlparse # class does not recognize this information as a valid # URI and throws an exception secret_format = re.match(r'^(.*)://(.*):(.*)@(.*)', uri.uri) if secret_format: self.user = secret_format.group(2) self.secret = secret_format.group(3) self.uri.uri = \ f'{secret_format.group(1)}://{secret_format.group(4)}' else: self.user = user self.secret = secret self.repository_metadata_dirs = [] self.repository_solvable_dir = None
[docs] def create_repository_solvable( self, target_dir=Defaults.get_solvable_location() ): """ Create SAT solvable for this repository from previously created intermediate solvables by merge and store the result solvable in the specified target_dir :param str target_dir: path name :return: file path to solvable :rtype: str """ Path.create(target_dir) solvable = os.sep.join( [target_dir, self.uri.alias()] ) if not self.is_uptodate(target_dir): self._setup_repository_metadata() solvable = self._merge_solvables(target_dir) return solvable
[docs] def timestamp(self): """ Return repository timestamp The retrieval of the repository timestamp depends on the type of the repository and is therefore supposed to be implemented in the specialized Solver Repository classes. If no such implementation exists the method returns the value 'static' to indicate there is no timestamp information available. :rtype: str """ return 'static'
[docs] def is_uptodate(self, target_dir=Defaults.get_solvable_location()): """ Check if repository metadata is up to date :return: True or False :rtype: bool """ solvable_time_file = ''.join( [target_dir, os.sep, self.uri.alias(), '.timestamp'] ) if os.path.exists(solvable_time_file): with open(solvable_time_file) as solvable_time: saved_time = solvable_time.read() if saved_time == self.timestamp() and not saved_time == 'static': return True return False
[docs] def download_from_repository(self, repo_source, target): """ Download given source file from the repository and store it as target file The repo_source location is used relative to the repository location and will be part of a mime type source like: `file://repo_path/repo_source` :param str repo_source: source file in the repo :param str target: file path :raises KiwiUriOpenError: if the download fails """ download_link = None try: download_link = os.sep.join( [ self._get_mime_typed_uri(), repo_source ] ) request = Request(download_link) if self.user and self.secret: credentials = b64encode( format(':'.join([self.user, self.secret])).encode() ) request.add_header( 'Authorization', b'Basic ' + credentials ) location = urlopen(request) except Exception as e: raise KiwiUriOpenError( f'{type(e).__name__}: {e} {download_link}' ) with open(target, 'wb') as target_file: target_file.write(location.read())
[docs] def get_repo_type(self): try: if self._get_repomd_xml(): return 'rpm-md' except KiwiUriOpenError: pass try: if self._get_deb_packages(): return 'apt-deb' except KiwiUriOpenError: pass try: repo_listing = self._get_pacman_packages() if '.db.sig\"' in repo_listing: return 'pacman' except KiwiUriOpenError: pass return None
def _get_pacman_packages(self): """ Download Arch repository listing for the current architecture :return: html directory listing :rtype: str """ dir_listing_download = Temporary().new_file() self.download_from_repository( defaults.PLATFORM_MACHINE, dir_listing_download.name ) if os.path.isfile(dir_listing_download.name): with open(dir_listing_download.name) as listing: return listing.read() def _get_deb_packages(self, download_dir=None): """ Download Packages.gz file from an apt repository and return its contents. If download_dir is set, download the file and return the file path name :param str download_dir: Download directory :return: Contents of Packages file or file path name :rtype: str """ repo_source = 'Packages.gz' if not download_dir: packages_download = Temporary().new_file() self.download_from_repository(repo_source, packages_download.name) if os.path.isfile(packages_download.name): with open(packages_download.name) as packages: return packages.read() else: packages_download = os.sep.join( [download_dir, repo_source.replace(os.sep, '_')] ) self.download_from_repository(repo_source, packages_download) return packages_download def _get_repomd_xml(self, lookup_path='repodata'): """ Parse repomd.xml file from lookup_path and return an etree This method only applies to rpm-md type repositories :param str lookup_path: relative path used to find repomd.xml file :return: Object with repomd.xml contents :rtype: XML etree """ xml_download = Temporary().new_file() xml_setup_file = os.sep.join([lookup_path, 'repomd.xml']) self.download_from_repository(xml_setup_file, xml_download.name) return etree.parse(xml_download.name) def _get_repomd_xpath(self, xml_data, expression): """ Call the provided xpath expression on the root element of the xml_data which must be an XML etree parsed document and return the result. This method only applies to repomd.xml files of the correct namespaces: * http://linux.duke.edu/metadata/repo * http://linux.duke.edu/metadata/rpm :param object xml_data: An XML etree object of a parsed XML file. :param str expression: An xpath expression to filder xml_data. :return: Elements matching the xpath expression :rtype: list """ namespace_map = dict( repo='http://linux.duke.edu/metadata/repo', rpm='http://linux.duke.edu/metadata/rpm' ) return xml_data.getroot().xpath( expression, namespaces=namespace_map ) def _setup_repository_metadata(self): """ Download all relevant repository metadata and create intermediate solvables from the result. The metadata structure depends on the type of the repository and must be implemented in the specialized Solver Repository classes. """ raise NotImplementedError def _create_solvables(self, metadata_dir, tool): """ Create intermediate (before merge) SAT solvables from the data given in the metadata_dir and store the result in the temporary repository_solvable_dir. The given tool must match the solvable data structure. There are the following tools to create a solvable from repository metadata: * rpmmd2solv solvable from repodata files * susetags2solv solvable from SUSE (yast2) repository files * comps2solv solvable from RHEL component files * rpms2solv solvable from rpm header files * deb2solv solvable from deb header files :param str metadata_dir: path name :param str tool: one of the above tools """ if not self.repository_solvable_dir: self.repository_solvable_dir = Temporary( prefix='kiwi_solvable_dir.' ).new_dir() if tool == 'rpms2solv': # solvable is created from a bunch of rpm files bash_command = [ tool, os.sep.join([metadata_dir, '*.rpm']), '>', self._get_random_solvable_name() ] Command.run(['bash', '-c', ' '.join(bash_command)]) else: # each file in the metadata_dir is considered a valid # solvable for the selected solv tool tool_options = [] if tool == 'deb2solv': tool_options.append('-r') for source in glob.iglob('/'.join([metadata_dir, '*'])): bash_command = [ 'gzip', '-cd', '--force', source, '|', tool ] + tool_options + [ '>', self._get_random_solvable_name() ] Command.run(['bash', '-c', ' '.join(bash_command)]) def _merge_solvables(self, target_dir): """ Merge all intermediate SAT solvables into one and store the result in the given target_dir. In addition an info file containing the repo url and a timestamp file is created :param str target_dir: path name """ if self.repository_solvable_dir: solvable = os.sep.join([target_dir, self.uri.alias()]) bash_command = [ 'mergesolv', '/'.join([self.repository_solvable_dir.name, '*']), '>', solvable ] Command.run(['bash', '-c', ' '.join(bash_command)]) with open('.'.join([solvable, 'info']), 'w') as solvable_info: solvable_info.write(''.join([self.uri.uri, os.linesep])) with open('.'.join([solvable, 'timestamp']), 'w') as solvable_time: solvable_time.write(self.timestamp()) return solvable def _get_mime_typed_uri(self): """ Adds `file` scheme for local URIs :return: A mime typed URI as string :rtype: str """ return self.uri.translate() if self.uri.is_remote() else ''.join( ['file://', self.uri.translate()] ) def _create_temporary_metadata_dir(self): """ Create and manage a temporary metadata directory :return: the path of the temporary directory just created :rtype: str """ metadata_dir = Temporary(prefix='kiwi_metadata_dir.').new_dir() self.repository_metadata_dirs.append(metadata_dir) return metadata_dir.name def _get_random_solvable_name(self): if self.repository_solvable_dir: return '{0}/solvable-{1}{2}{3}{4}'.format( self.repository_solvable_dir.name, self._rand(), self._rand(), self._rand(), self._rand() ) def _rand(self): return '%02x' % random.randrange(1, 0xfe)