Source code for kiwi.command_process

# Copyright (c) 2015 SUSE Linux GmbH.  All rights reserved.
# This file is part of kiwi.
# kiwi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# kiwi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with kiwi.  If not, see <>
import logging
from collections import namedtuple
from kiwi.command import CommandCallT

# project
from kiwi.utils.codec import Codec
from kiwi.logger import Logger

from kiwi.exceptions import KiwiCommandError

log = logging.getLogger('kiwi')

[docs] class CommandProcess: """ **Implements processing of non blocking Command calls** Provides methods to iterate over non blocking instances of the Command class with and without progress information :param subprocess command: instance of subprocess :param string log_topic: topic string for logging """ def __init__(self, command: CommandCallT, log_topic='system') -> None: self.command = CommandIterator(command) self.log_topic = log_topic self.items_processed = 0
[docs] def poll(self): """ Iterate over process, raise on error and log output """ for line in self.command: if line: log.debug('%s: %s', self.log_topic, line) if self.command.get_error_code() != 0: raise KiwiCommandError( self.command.get_error_output() )
[docs] def poll_show_progress(self, items_to_complete, match_method): """ Iterate over process and show progress in percent raise on error and log output :param list items_to_complete: all items :param function match_method: method matching item """ self._init_progress() for line in self.command: if line: log.debug('%s: %s', self.log_topic, line) self._update_progress( match_method, items_to_complete, line ) self._stop_progress() if self.command.get_error_code() != 0: raise KiwiCommandError( self.command.get_error_output() )
[docs] def poll_and_watch(self): """ Iterate over process don't raise on error and log stdout and stderr """ log.debug('--------------out start-------------') for line in self.command: if line: log.debug(line) log.debug('--------------out stop--------------') error_code = self.command.get_error_code() error_output = self.command.get_error_output() result = namedtuple( 'result', ['stderr', 'returncode'] ) if error_output: log.debug('--------------err start-------------') log.debug(error_output) log.debug('--------------err stop--------------') return result( stderr=error_output, returncode=error_code )
[docs] def create_match_method(self, method): """ create a matcher function pointer which calls the given method as method(item_to_match, data) on dereference :param function method: function reference :return: function pointer :rtype: object """ def create_method(item_to_match, data): return method(item_to_match, data) return create_method
[docs] def returncode(self): return self.command.get_error_code()
def _init_progress(self): Logger.progress( 0, 100, '[ INFO ]: Processing' ) def _stop_progress(self): Logger.progress( 100, 100, '[ INFO ]: Processing' ) def _update_progress( self, match_method, items_to_complete, command_output ): items_count = len(items_to_complete) for item in items_to_complete: if match_method(item, command_output): self.items_processed += 1 if self.items_processed <= items_count: Logger.progress( self.items_processed, items_count, '[ INFO ]: Processing' )
[docs] class CommandIterator: """ **Implements an Iterator for Instances of Command** :param subprocess command: instance of subprocess """ def __init__(self, command: CommandCallT) -> None: self.command = command self.command_error_output = bytes(b'') self.command_output_line = bytes(b'') self.output_eof_reached = False self.errors_eof_reached = False def __next__(self): line_read = None if self.command.process.poll() is not None: if self.output_eof_reached and self.errors_eof_reached: raise StopIteration() if self.command.output_available(): byte_read = if not byte_read: self.output_eof_reached = True elif byte_read == bytes(b'\n'): line_read = Codec.decode(self.command_output_line) self.command_output_line = bytes(b'') else: self.command_output_line += byte_read if self.command.error_available(): byte_read = if not byte_read: self.errors_eof_reached = True else: self.command_error_output += byte_read return line_read
[docs] def get_error_output(self): """ Provide data which was sent to the stderr channel :return: stderr data :rtype: str """ return Codec.decode(self.command_error_output)
[docs] def get_error_code(self) -> int: """ Provide return value from processed command :return: errorcode :rtype: int """ return self.command.process.returncode
[docs] def get_pid(self) -> int: """ Provide process ID of command while running :return: pid :rtype: int """ return
[docs] def kill(self) -> None: """ Send kill signal SIGTERM to command process """ self.command.process.kill()
def __iter__(self): return self