Skip to content

Commit

Permalink
driver/usbstoragedriver: add write_files method
Browse files Browse the repository at this point in the history
write_image, especially with the seek and skip parameters, can be useful
for fast iterations during bootloader development: Only the bootloader
is copied and the rest of the storage medium stays intact.

Some boot firmware however loads subsequent boot stages from a, usually
FAT32, filesystem, notably UEFI firmware from the EFI system partition.

Make development on such targets easier by adding a write_files method
that temporarily mounts the target device using udisks2 and writes the
specified files there.

Signed-off-by: Stefan Wiehler <stefan.wiehler@missinglinkelectronics.com>
Co-authored-by: Ahmad Fatoum <a.fatoum@pengutronix.de>
Signed-off-by: Ahmad Fatoum <a.fatoum@pengutronix.de>
  • Loading branch information
Stefan Wiehler and a3f committed Jun 25, 2023
1 parent 707255b commit b2b1b99
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
2 changes: 1 addition & 1 deletion doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ Used by:
- `USBStorageDriver`_

The NetworkUSBMassStorage can be used in test cases by calling the
`write_image()`, and `get_size()` functions.
`write_files()`, `write_image()`, and `get_size()` functions.

SigrokDevice
~~~~~~~~~~~~
Expand Down
113 changes: 113 additions & 0 deletions labgrid/driver/usbstoragedriver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import enum
import logging
import os
import pathlib
import re
import time
import subprocess

Expand Down Expand Up @@ -43,6 +45,8 @@ class USBStorageDriver(Driver):
)
WAIT_FOR_MEDIUM_TIMEOUT = 10.0 # s
WAIT_FOR_MEDIUM_SLEEP = 0.5 # s
UNMOUNT_MAX_RETRIES = 5
UNMOUNT_BUSY_WAIT = 3 # s

def __attrs_post_init__(self):
super().__attrs_post_init__()
Expand All @@ -54,6 +58,115 @@ def on_activate(self):
def on_deactivate(self):
pass

def _mount(self, partition):
target = self._get_devpath(partition)
args = ["udisksctl", "mount", "--no-user-interaction", "-b", target]
self._wait_for_medium(partition)

try:
result = processwrapper.check_output(self.storage.command_prefix + args)
except subprocess.CalledProcessError as e:
output = e.output.decode('utf-8').rstrip()
self.logger.warning('%s', output)

# udisksctl's already mounted error message does't inspire confidence
# with respect to stability, so we best effort match it and unmount/remount
# or bail out. Message is currently:
# Error mounting ...: GDBus.Error:org.freedesktop.UDisks2.Error.AlreadyMounted:
# Device ... is already mounted at `...'.
if re.search('already.?mounted', output, re.IGNORECASE) is None:
raise e

self.logger.warning('Unmounting lazily and remounting %s...', target)
self._unmount_lazy(partition)
result = processwrapper.check_output(self.storage.command_prefix + args)

capture = re.search('^Mounted (.*) at (.*)$', result.decode('utf-8'))
if capture is None:
self._unmount_lazy(partition)
raise RuntimeError("couldn't determine mount path")

return capture.group(2)

def _unmount_lazy(self, partition):
target = self._get_devpath(partition)
args = ["udisksctl", "unmount", "--no-user-interaction", "-b", target, "--force"]

try:
processwrapper.check_output(self.storage.command_prefix + args)
except subprocess.CalledProcessError as e:
# Error unmounting ...: GDBus.Error:org.freedesktop.UDisks2.Error.NotMounted:
# Device `...' is not mounted
if re.search(b'not.?mounted', e.output, re.IGNORECASE) is None:
raise

def _unmount(self, partition):
target = self._get_devpath(partition)
args = ["udisksctl", "unmount", "--no-user-interaction", "-b", target]

for _ in range(self.UNMOUNT_MAX_RETRIES):
try:
processwrapper.check_output(self.storage.command_prefix + args)
return
except subprocess.CalledProcessError as e:
output = e.output.decode('utf-8').rstrip()
# Error unmounting ...: GDBus.Error:org.freedesktop.UDisks2.Error.DeviceBusy:
# Error unmounting ...: target is busy
if re.search('busy', output) is None:
raise e

self.logger.info('%s wait for %s s', output, self.UNMOUNT_BUSY_WAIT)
time.sleep(self.UNMOUNT_BUSY_WAIT)

# Last resort: raise an exception to trigger the lazy unmount
raise ExecutionError(f"Timeout while waiting to unmount {target}")

@Driver.check_active
@step(args=['sources', 'target', 'partition', 'target_is_directory'])
def write_files(self, sources, target, partition, target_is_directory=True):
"""
Write the file(s) specified by filename(s) to the
bound USB storage partition.
Args:
sources (List[str]): path(s) to the file(s) to be copied to the bound USB storage
partition.
target (str): target directory or file to copy to
partition (int): mount the specified partition or None to mount the whole disk
target_is_directory (bool): Whether target is a directory
"""

mount_path = self._mount(partition)

try:
# (pathlib.PurePath(...) / "/") == "/", so we turn absolute paths into relative
# paths with respect to the mount point here
target_rel = target.relative_to(target.root) if target.root is not None else target
target_path = str(pathlib.PurePath(mount_path) / target_rel)

copied_sources = []

for f in sources:
mf = ManagedFile(f, self.storage)
mf.sync_to_resource()
copied_sources = copied_sources + [mf.get_remote_path()]

if target_is_directory:
args = ["cp", "-t", target_path] + copied_sources
else:
if len(sources) != 1:
raise ValueError("single source argument required when target_is_directory=False")

args = ["cp", "-T", copied_sources[0], target_path]

processwrapper.check_output(self.storage.command_prefix + args)
self._unmount(partition)
except:
# We are going to die with an exception anyway, so no point in waiting
# to make sure everything has been written before continuing
self._unmount_lazy(partition)
raise

@Driver.check_active
@step(args=['filename'])
def write_image(self, filename=None, mode=Mode.DD, partition=None, skip=0, seek=0):
Expand Down

0 comments on commit b2b1b99

Please sign in to comment.