Source code for kiwi.storage.loop_device

# Copyright (c) 2015 SUSE Linux GmbH.  All rights reserved.
#
# This file is part of kiwi.
#
# kiwi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# kiwi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with kiwi.  If not, see <http://www.gnu.org/licenses/>
#
import os
import logging
import time
import pathlib

# project
from kiwi.command import Command
from kiwi.storage.device_provider import DeviceProvider
from kiwi.utils.command_capabilities import CommandCapabilities

from kiwi.exceptions import (
    KiwiLoopSetupError,
    KiwiCommandNotFound,
    KiwiCommandError
)

log = logging.getLogger('kiwi')


[docs] class LoopDevice(DeviceProvider): """ **Create and manage loop device file for block operations** :param string filename: loop file name to create :param int filesize_mbytes: size of the loop file :param int blocksize_bytes: blocksize used in loop driver """ def __init__( self, filename: str, filesize_mbytes: int = None, blocksize_bytes: int = None ): self.node_name = '' if not os.path.exists(filename) and not filesize_mbytes: raise KiwiLoopSetupError( 'Can not create loop file without a size' ) self.filename = filename self.filesize_mbytes = filesize_mbytes self.blocksize_bytes = blocksize_bytes def __enter__(self): return self
[docs] def get_device(self) -> str: """ Device node name :return: device node name :rtype: str """ return self.node_name
[docs] def is_loop(self) -> bool: """ Always True :return: True :rtype: bool """ return True
[docs] def create(self, overwrite: bool = True): """ Setup a loop device of the blocksize given in the constructor The file to loop is created with the size specified in the constructor unless an existing one should not be overwritten :param bool overwrite: overwrite existing file to loop """ if overwrite: qemu_img_size = format(self.filesize_mbytes) + 'M' Command.run( ['qemu-img', 'create', self.filename, qemu_img_size] ) loop_options = [] if self.blocksize_bytes and self.blocksize_bytes != 512: if CommandCapabilities.has_option_in_help( 'losetup', '--sector-size', raise_on_error=False ): loop_options.append('--sector-size') else: loop_options.append('--logical-blocksize') loop_options.append(format(self.blocksize_bytes)) loop_call = Command.run( ['losetup'] + loop_options + ['-f', '--show', self.filename] ) self.node_name = loop_call.output.rstrip(os.linesep)
def __exit__(self, exc_type, exc_value, traceback): if self.node_name: try: Command.run(['losetup', '-d', self.node_name]) # loop detach is an async operation, re-use of the device # should not be done before the block device has been released loop_released = False sys_block_loop = pathlib.Path( '/sys/devices/virtual/block/{0}/loop'.format( os.path.basename(self.node_name) ) ) for busy in range(0, 100): if not sys_block_loop.is_block_device(): loop_released = True break time.sleep(0.1) if not loop_released: raise KiwiCommandError( f'Loop device {self.node_name} still attached' ) except (KiwiCommandError, KiwiCommandNotFound) as issue: log.error( 'loop cleanup on {0} failed with: {1}'.format( self.node_name, issue ) )