# Copyright (c) 2015 SUSE Linux GmbH. All rights reserved.
#
# This file is part of kiwi.
#
# kiwi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# kiwi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with kiwi. If not, see <http://www.gnu.org/licenses/>
#
import re
import os
import logging
import shutil
import datetime
from xml.etree import ElementTree
from xml.dom import minidom
from typing import List
# project
from kiwi.command import Command
from kiwi.volume_manager.base import VolumeManagerBase
from kiwi.mount_manager import MountManager
from kiwi.storage.mapped_device import MappedDevice
from kiwi.filesystem import FileSystem
from kiwi.utils.sync import DataSync
from kiwi.utils.block import BlockID
from kiwi.utils.sysconfig import SysConfig
from kiwi.path import Path
from kiwi.defaults import Defaults
from kiwi.exceptions import (
KiwiVolumeRootIDError,
KiwiVolumeManagerSetupError
)
log = logging.getLogger('kiwi')
[docs]
class VolumeManagerBtrfs(VolumeManagerBase):
"""
Implements btrfs sub-volume management
:param list subvol_mount_list: list of mounted btrfs subvolumes
:param object toplevel_mount: :class:`MountManager` for root mountpoint
"""
[docs]
def post_init(self, custom_args):
"""
Post initialization method
Store custom btrfs initialization arguments
:param dict custom_args: custom btrfs volume manager arguments
"""
if custom_args:
self.custom_args = custom_args
else:
self.custom_args = {}
if 'root_label' not in self.custom_args:
self.custom_args['root_label'] = 'ROOT'
if 'root_is_snapshot' not in self.custom_args:
self.custom_args['root_is_snapshot'] = False
if 'btrfs_default_volume_requested' not in self.custom_args:
self.custom_args['btrfs_default_volume_requested'] = True
if 'root_is_readonly_snapshot' not in self.custom_args:
self.custom_args['root_is_readonly_snapshot'] = False
if 'root_is_subvolume' not in self.custom_args:
self.custom_args['root_is_subvolume'] = None
if 'quota_groups' not in self.custom_args:
self.custom_args['quota_groups'] = False
self.root_volume_name = '/'
self.default_volume_name = self.root_volume_name
if self._has_root_volume():
self.root_volume_name = '@'
canonical_volume_list = self.get_canonical_volume_list()
for volume in canonical_volume_list.volumes:
if volume.is_root_volume and volume.name:
self.root_volume_name = volume.name
self.default_volume_name = self.root_volume_name
if self.custom_args['root_is_snapshot'] and \
self.root_volume_name == '/':
log.warning('root_is_snapshot requires a toplevel sub-volume')
log.warning('root_is_snapshot has been disabled')
self.custom_args['root_is_snapshot'] = False
self.subvol_mount_list = []
self.toplevel_mount = None
[docs]
def setup(self, name=None):
"""
Setup btrfs volume management
In case of btrfs an optional toplevel subvolume is created and marked
as default volume. If snapshots are activated via the custom_args
the setup method also creates the .snapshots/1/snapshot
subvolumes. There is no concept of a volume manager name, thus
the name argument is not used for btrfs
:param string name: unused
"""
self.setup_mountpoint()
with FileSystem.new(
name='btrfs',
device_provider=MappedDevice(
device=self.device, device_provider=self.device_provider_root
),
custom_args=self.custom_filesystem_args
) as filesystem:
filesystem.create_on_device(
label=self.custom_args['root_label']
)
self.toplevel_mount = MountManager(
device=self.device, mountpoint=self.mountpoint
)
self.toplevel_mount.mount(
self.custom_filesystem_args['mount_options']
)
if self.custom_args['quota_groups']:
Command.run(
['btrfs', 'quota', 'enable', self.mountpoint]
)
if self.root_volume_name != '/':
root_volume = self.mountpoint + f'/{self.root_volume_name}'
Command.run(
['btrfs', 'subvolume', 'create', root_volume]
)
if self.custom_args['root_is_snapshot']:
snapshot_volume = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots'
Command.run(
['btrfs', 'subvolume', 'create', snapshot_volume]
)
os.chmod(snapshot_volume, 0o700)
Path.create(snapshot_volume + '/1')
snapshot = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots/1/snapshot'
Command.run(
['btrfs', 'subvolume', 'create', snapshot]
)
self._set_default_volume(
f'{self.root_volume_name}/.snapshots/1/snapshot'
)
snapshot = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots/1/snapshot'
# Mount /{some-name}/.snapshots as /.snapshots inside the root
snapshots_mount = MountManager(
device=self.device,
attributes={
'subvol_path': f'{self.root_volume_name}/.snapshots',
'subvol_name': f'{self.root_volume_name}/.snapshots'
},
mountpoint=snapshot + '/.snapshots'
)
self.subvol_mount_list.append(snapshots_mount)
elif self.root_volume_name != '/':
self._set_default_volume(self.root_volume_name)
[docs]
def get_root_volume_name(self) -> str:
"""
Provides name of the root volume
:return: directory path name
:rtype: string
"""
return self.default_volume_name
[docs]
def create_volumes(self, filesystem_name):
"""
Create configured btrfs subvolumes
Any btrfs subvolume is of the same btrfs filesystem. There is no
way to have different filesystems per btrfs subvolume. Thus
the filesystem_name has no effect for btrfs
:param string filesystem_name: unused
"""
log.info(
'Creating %s sub volumes', filesystem_name
)
self.create_volume_paths_in_root_dir()
canonical_volume_list = self.get_canonical_volume_list()
if canonical_volume_list.full_size_volume:
# put an eventual fullsize volume to the volume list
# because there is no extra handling required for it on btrfs
canonical_volume_list.volumes.append(
canonical_volume_list.full_size_volume
)
for volume in canonical_volume_list.volumes:
if volume.is_root_volume:
# the btrfs root volume has been created as
# part of the setup procedure
pass
else:
log.info('--> sub volume %s', volume.realpath)
toplevel = os.path.normpath(
self.mountpoint + os.sep + self.root_volume_name
)
if volume.parent:
toplevel = os.path.normpath(
self.mountpoint + os.sep + volume.parent
)
Path.create(
os.path.dirname(
os.path.normpath(toplevel + os.sep + volume.realpath)
)
)
Command.run(
[
'btrfs', 'subvolume', 'create',
os.path.normpath(toplevel + os.sep + volume.realpath)
]
)
self._apply_quota(
os.path.normpath(toplevel + os.sep + volume.realpath),
volume.attributes
)
self.apply_attributes_on_volume(
toplevel, volume
)
volume_mountpoint = toplevel
root_is_snapshot = self.custom_args['root_is_snapshot']
attributes = {
'parent': volume.parent or '',
'subvol_path': os.path.normpath(
toplevel.replace(
self.mountpoint, ''
) + os.sep + volume.realpath
).lstrip(os.sep),
'subvol_name': volume.name
}
if root_is_snapshot:
volume_mountpoint = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots/1/snapshot/'
attributes = {
'subvol_path': os.path.normpath(
self.root_volume_name + os.sep + volume.realpath
),
'subvol_name': os.path.normpath(
self.root_volume_name + os.sep + volume.realpath
)
}
volume_mount = MountManager(
device=self.device,
attributes=attributes,
mountpoint=os.path.normpath(
os.sep.join(
[
volume_mountpoint,
self.root_volume_name if not root_is_snapshot else '',
volume.realpath
]
)
)
)
self.subvol_mount_list.append(
volume_mount
)
[docs]
def get_fstab(
self, persistency_type: str = 'by-label', filesystem_name: str = ''
) -> List[str]:
"""
Implements creation of the fstab entries. The method
returns a list of fstab compatible entries
:param string persistency_type: by-label | by-uuid
:param string filesystem_name: unused
:return: list of fstab entries
:rtype: list
"""
fstab_entries = []
mount_options = \
self.custom_filesystem_args['mount_options'] or ['defaults']
block_operation = BlockID(self.device)
blkid_type = 'LABEL' if persistency_type == 'by-label' else 'UUID'
device_id = block_operation.get_blkid(blkid_type)
for volume_mount in self.subvol_mount_list:
mount_point = volume_mount.get_attributes().get('subvol_path')
# Delete root_volume_name from mountpoint path if present
if self.root_volume_name != '/' and \
mount_point.startswith(self.root_volume_name):
mount_point = mount_point.replace(self.root_volume_name, '')
mount_entry_options = mount_options + [
'subvol=' + volume_mount.get_attributes().get(
'subvol_path'
).lstrip(os.sep)
]
fs_check = self._is_volume_enabled_for_fs_check(
volume_mount.mountpoint
)
fstab_entry = ' '.join(
[
blkid_type + '=' + device_id,
mount_point if mount_point.startswith(
os.sep
) else f'{os.sep}{mount_point}',
'btrfs', ','.join(mount_entry_options),
'0 {fs_passno}'.format(
fs_passno='2' if fs_check else '0'
)
]
)
fstab_entries.append(fstab_entry)
return fstab_entries
[docs]
def get_volumes(self):
"""
Return dict of volumes
:return: volumes dictionary
:rtype: dict
"""
volumes = {}
for volume_mount in self.subvol_mount_list:
subvol_path = volume_mount.get_attributes().get('subvol_path')
subvol_options = ','.join(
[
'subvol=' + subvol_path
] + self.custom_filesystem_args['mount_options']
)
subvol_path = subvol_path.replace(
self.root_volume_name, ''
) if self.root_volume_name != '/' else subvol_path
volumes[subvol_path] = {
'volume_options': subvol_options,
'volume_device': volume_mount.device
}
return volumes
[docs]
def mount_volumes(self):
"""
Mount btrfs subvolumes
"""
self.toplevel_mount.mount(
self.custom_filesystem_args['mount_options']
)
for volume_mount in self.subvol_mount_list:
if not os.path.exists(volume_mount.mountpoint):
Path.create(volume_mount.mountpoint)
subvol_path = volume_mount.get_attributes().get('subvol_path')
subvol_options = ','.join(
[
'subvol=' + subvol_path
] + self.custom_filesystem_args['mount_options']
)
volume_mount.mount(
options=[subvol_options]
)
[docs]
def umount_volumes(self) -> None:
"""
Umount btrfs subvolumes
"""
for volume_mount in reversed(self.subvol_mount_list):
if volume_mount.is_mounted():
volume_mount.umount()
if self.toplevel_mount.is_mounted():
self.toplevel_mount.umount()
[docs]
def get_mountpoint(self) -> str:
"""
Provides btrfs root mount point directory
Effective use of the directory is guaranteed only after sync_data
:return: directory path name
:rtype: string
"""
if not self.mountpoint:
raise KiwiVolumeManagerSetupError("No mountpoint exists")
sync_target: List[str] = [self.mountpoint]
if self.root_volume_name != '/':
sync_target.append(self.root_volume_name)
if self.custom_args.get('root_is_snapshot'):
sync_target.extend(['.snapshots', '1', 'snapshot'])
return os.path.join(*sync_target)
[docs]
def sync_data(self, exclude=None):
"""
Sync data into btrfs filesystem
If snapshots are activated the root filesystem is synced
into the first snapshot
:param list exclude: files to exclude from sync
"""
if self.toplevel_mount:
sync_target = self.get_mountpoint()
if self.custom_args['root_is_snapshot']:
self._create_snapshot_info(
''.join(
[
self.mountpoint,
f'/{self.root_volume_name}/.snapshots/1/info.xml'
]
)
)
data = DataSync(self.root_dir, sync_target)
data.sync_data(
options=Defaults.get_sync_options(), exclude=exclude
)
if self.custom_args['quota_groups'] and \
self.custom_args['root_is_snapshot']:
self._create_snapper_quota_configuration()
[docs]
def set_property_readonly_root(self):
"""
Sets the root volume to be a readonly filesystem
"""
root_is_snapshot = \
self.custom_args['root_is_snapshot']
root_is_readonly_snapshot = \
self.custom_args['root_is_readonly_snapshot']
if root_is_snapshot and root_is_readonly_snapshot:
sync_target = self.get_mountpoint()
Command.run(
['btrfs', 'property', 'set', sync_target, 'ro', 'true']
)
def _apply_quota(self, volume_path: str, attributes: List[str]):
for attribute in attributes:
if attribute.startswith('quota='):
quota = attribute.split('=')[1]
Command.run(
['btrfs', 'quota', 'enable', '--simple', volume_path]
)
Command.run(
['btrfs', 'qgroup', 'limit', quota, volume_path]
)
def _has_root_volume(self) -> bool:
has_root_volume = bool(self.custom_args['root_is_subvolume'])
if self.custom_args['root_is_subvolume'] is None:
# root volume not explicitly configured, will
# be enabled by default but this is going to change
# in the future. Print a deprecation message to inform
# the user about a potential behavior change
log.warning("Implicitly creating root volume")
log.warning(
"--> Future versions of kiwi will not do this anymore"
)
log.warning(
"--> Please specify btrfs_root_is_subvolume true|false"
)
has_root_volume = True
return has_root_volume
def _is_volume_enabled_for_fs_check(self, mountpoint):
for volume in self.volumes:
if volume.realpath in mountpoint:
if 'enable-for-filesystem-check' in volume.attributes:
return True
return False
def _set_default_volume(self, default_volume):
subvolume_list_call = Command.run(
['btrfs', 'subvolume', 'list', self.mountpoint]
)
for subvolume in subvolume_list_call.output.split('\n'):
id_search = re.search(r'ID (\d+) .*path (.*)', subvolume)
if id_search:
volume_id = id_search.group(1)
volume_path = id_search.group(2)
if volume_path == default_volume:
if self.custom_args['btrfs_default_volume_requested']:
Command.run(
[
'btrfs', 'subvolume', 'set-default',
volume_id, self.mountpoint
]
)
self.default_volume_name = default_volume
return
raise KiwiVolumeRootIDError(
'Failed to find btrfs volume: %s' % default_volume
)
def _xml_pretty(self, toplevel_element):
xml_data_unformatted = ElementTree.tostring(
toplevel_element, 'utf-8'
)
xml_data_domtree = minidom.parseString(xml_data_unformatted)
return xml_data_domtree.toprettyxml(indent=" ")
def _create_snapper_quota_configuration(self):
root_path = os.sep.join(
[
self.mountpoint,
f'{self.root_volume_name}/.snapshots/1/snapshot'
]
)
snapper_default_conf = Defaults.get_snapper_config_template_file(
root_path
)
if snapper_default_conf:
# snapper requires an extra parent qgroup to operate with quotas
Command.run(
['btrfs', 'qgroup', 'create', '1/0', self.mountpoint]
)
config_file = self._set_snapper_sysconfig_file(root_path)
if not os.path.exists(config_file):
shutil.copyfile(snapper_default_conf, config_file)
Command.run([
'chroot', root_path, 'snapper', '--no-dbus', 'set-config',
'QGROUP=1/0'
])
@staticmethod
def _set_snapper_sysconfig_file(root_path):
sysconf_file = SysConfig(
os.sep.join([root_path, 'etc/sysconfig/snapper'])
)
if not sysconf_file.get('SNAPPER_CONFIGS') or \
len(sysconf_file['SNAPPER_CONFIGS'].strip('\"')) == 0:
sysconf_file['SNAPPER_CONFIGS'] = '"root"'
sysconf_file.write()
elif len(sysconf_file['SNAPPER_CONFIGS'].split()) > 1:
raise KiwiVolumeManagerSetupError(
'Unsupported SNAPPER_CONFIGS value: {0}'.format(
sysconf_file['SNAPPER_CONFIGS']
)
)
return os.sep.join([
root_path, 'etc/snapper/configs',
sysconf_file['SNAPPER_CONFIGS'].strip('\"')]
)
def _create_snapshot_info(self, filename):
date_info = datetime.datetime.now()
snapshot = ElementTree.Element('snapshot')
snapshot_type = ElementTree.SubElement(snapshot, 'type')
snapshot_type.text = 'single'
snapshot_number = ElementTree.SubElement(snapshot, 'num')
snapshot_number.text = '1'
snapshot_description = ElementTree.SubElement(snapshot, 'description')
snapshot_description.text = 'first root filesystem'
snapshot_date = ElementTree.SubElement(snapshot, 'date')
snapshot_date.text = date_info.strftime("%Y-%m-%d %H:%M:%S")
with open(filename, 'w') as snapshot_info_file:
snapshot_info_file.write(self._xml_pretty(snapshot))
def __exit__(self, exc_type, exc_value, traceback):
if self.toplevel_mount:
self.umount_volumes()