From 21f8c15cc7d550222e5b07826b272345bd93876f Mon Sep 17 00:00:00 2001 From: Mauro Carvalho Chehab Date: Wed, 10 Jul 2024 14:15:24 +0200 Subject: [PATCH] util/arm_einj.py: add an utility for ARM error injection via QEMU Testing rasdaemon is not easy, as it depends on either having real hardware producing events or a test BIOS. This is usually not available and/or not too reliable. So, take a different approach by adding a QEMU QAPI designed for doing hardware error injection. The QEMU patches are at: https://gitlab.com/mchehab_kernel/qemu/-/tree/arm-error-inject-v2 And some instructions about how to use it are at rasdaemon wiki pages at github: https://github.com/mchehab/rasdaemon/wiki Add the error injection tool to rasdaemon sources. Signed-off-by: Mauro Carvalho Chehab --- util/arm_einj.py | 365 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 365 insertions(+) create mode 100755 util/arm_einj.py diff --git a/util/arm_einj.py b/util/arm_einj.py new file mode 100755 index 0000000..114997e --- /dev/null +++ b/util/arm_einj.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python3 +# +# pylint: disable=C0301, C0114 +# SPDX-License-Identifier: GPL-2.0 +# +# Copyright (C) 2024 Mauro Carvalho Chehab + +import argparse +import json +import socket +import sys + +einj_description= """ +Do UEFI CPER firmware-first error injections via QEMU. + +This small utility works together with those QEMU additions: + https://gitlab.com/mchehab_kernel/qemu/-/tree/arm-error-inject-v2 + +It allows using UEFI BIOS EINJ features on ARM32/64 emulation to do ARM +processor error injection, to be decoded by Linux CPER and GHES drivers, +helping to test rasdaemon firmware-first error handling for ARM processor +events, being compatible with UEFI 2.9A Errata. +""" + + +# Valid choice values +ARM_valid_bits = { + "mpidr": "mpidr-valid", + "affinity": "affinity-valid", + "running": "running-state-valid", + "vendor": "vendor-specific-valid" +} + +PEI_flags = { + "first": "first-error-cap", + "last": "last-error-cap", + "propagated": "propagated", + "overflow": "overflow", +} + +PEI_error_types = { + "cache": "cache-error", + "tlb": "tlb-error", + "bus": "bus-error", + "micro-arch": "micro-arch-error", + "vendor": "micro-arch-error" +} + +PEI_valid_bits = { + "multiple-error": "multiple-error-valid", + "flags": "flags-valid", + "error": "error-info-valid", + "virt": "virt-addr-valid", + "virtual": "virt-addr-valid", + "phy": "phy-addr-valid", + "physical": "phy-addr-valid" +} + +def einj_command(host, port, commands): + """Send commands to QEMU though QMP TCP socket""" + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((host, port)) + + data = s.recv(1024) + + print("\t", data.decode('utf-8'), end="") + + for command in commands: + print(command) + + s.sendall(command.encode('utf-8')) + data = s.recv(1024) + print("\t", data.decode('utf-8'), end="") + + s.shutdown(socket.SHUT_WR) + while 1: + data = s.recv(1024) + if data == b"": + break + print("\t", data.decode('utf-8')) + + s.close() + +def get_choice(name, value, choices, suffixes=None): + new_values = [] + + if not value: + return new_values + + for val in value.split(","): + val = val.lower() + + if suffixes: + for suffix in suffixes: + val = val.removesuffix(suffix) + + if val not in choices.keys(): + sys.exit(f"Error on '{name}': choice {val} is invalid.") + + val = choices[val] + + new_values.append(val) + + return new_values + +def get_mult_array(mult, name, values, allow_zero=False, max_val=None): + if not allow_zero: + if not values: + return + else: + if values is None: + return + elif not values: + i = 0 + if i not in mult: + mult[i] = {} + + mult[i][name]=[] + return + + i = 0 + for value in values: + for val in value.split(","): + try: + val = int(val, 0) + except: + sys.exit(f"Error on '{name}': {val} is not an integer") + + if val < 0: + sys.exit(f"Error on '{name}': {val} is not unsigned") + + if max_val and val > max_val: + sys.exit(f"Error on '{name}': {val} is too big") + + if i not in mult: + mult[i] = {} + + if name not in mult[i]: + mult[i][name] = [] + + mult[i][name].append(val) + + i += 1 + +def get_mult_choices(mult, name, values, choices, + suffixes = None, allow_zero=False): + if not allow_zero: + if not values: + return + else: + if values is None: + return + + i = 0 + for val in values: + new_values = get_choice(name, val, choices, suffixes) + + if i not in mult: + mult[i] = {} + + mult[i][name] = new_values + i += 1 + +def get_mult_int(mult, name, values, allow_zero=False): + if not allow_zero: + if not values: + return + else: + if values is None: + return + + i = 0 + for val in values: + try: + val = int(val, 0) + except: + sys.exit(f"Error on '{name}': {val} is not an integer") + + if val < 0: + sys.exit(f"Error on '{name}': {val} is not unsigned") + + if i not in mult: + mult[i] = {} + + mult[i][name] = val + i += 1 + +class errorInjection: + def __init__(self, args=None, arm=None): + + """Initialize the error injection class. There are two possible + ways to initialize it: + 1. passing a set of arguments; + 2. passing a dict with error inject command parameters. Each + column is handled in separate.""" + + self.arm = {} + + if not args: + self.args = args + return + + print("ARGS:", args) + + pei = {} + ctx = {} + vendor = {} + + # Handle global parameters + if args.arm: + self.arm["validation"] = get_choice(name="validation", value=args.arm, + choices=ARM_valid_bits, + suffixes=["-error", "-err"]) + + if args.affinity: + self.arm["affinity-level"] = args.affinity + + if args.mpidr: + self.arm["mpidr-el1"] = args.mpidr + + if args.midr: + self.arm["midr-el1"] = args.midr + + if args.running is not None: + if args.running: + self.arm["running-state"] = ["processor-running"] + else: + self.arm["running-state"] = [] + + if args.psci: + self.arm["psci-state"] = args.psci + + + # Handle PEI + if not args.type: + args.type = ['cache-error'] + + get_mult_choices(pei, name="validation", values=args.pei_valid, + choices=PEI_valid_bits, + suffixes=["-valid", "-info", "--information", "--addr"]) + get_mult_choices(pei, name="type", values=args.type, + choices=PEI_error_types, + suffixes=["-error", "-err"]) + get_mult_choices(pei, name="flags", values=args.flags, + choices=PEI_flags, + suffixes=["-error", "-cap"]) + get_mult_int(pei, "multiple-error", args.multiple_error) + get_mult_int(pei, "phy-addr", args.physical_address) + get_mult_int(pei, "vir-addr", args.virtual_address) + + # Handle context + get_mult_int(ctx, "type", args.ctx_type, allow_zero=True) + get_mult_int(ctx, "minimal-size", args.ctx_size, allow_zero=True) + get_mult_array(ctx, "register", args.ctx_array, allow_zero=True) + + get_mult_array(vendor, "bytes", args.vendor, max_val=255) + + # Store PEI + self.arm["error"] = [] + for k in sorted(pei.keys()): + self.arm["error"].append(pei[k]) + + # Store Context + if ctx: + self.arm["context"] = [] + for k in sorted(ctx.keys()): + self.arm["context"].append(ctx[k]) + + # Vendor-specific bytes are not grouped + if vendor: + self.arm["vendor-specific"] = [] + for k in sorted(vendor.keys()): + self.arm["vendor-specific"] += vendor[k]["bytes"] + + def run(self, host, port): + + # Fill the commands to be sent + commands = [] + + # Needed to negotiate QMP and for QEMU to accept the command + commands.append('{ "execute": "qmp_capabilities" } ') + + command = '{ "execute": "arm-inject-error", ' + command += '"arguments": ' + json.dumps(self.arm) + ' }' + + commands.append(command) + + einj_command(host, port, commands) + + +def handle_args(): + parser = argparse.ArgumentParser(prog="einj.py", + usage="%(prog)s [options]", + description=einj_description, + epilog="If a field is not defined, a default value will be applied by QEMU.") + + # TCP QEMU QMP host/port + g_options = parser.add_argument_group("Socket options") + g_options.add_argument("-H", "--host", default="localhost", type=str, + help="host name") + g_options.add_argument("-P", "--port", default=4445, type=int, + help="TCP port number") + + # UEFI N.16 ARM Validation bits + g_arm = parser.add_argument_group("ARM processor") + g_arm.add_argument("--arm", "--arm-valid", + help="ARM validation bits: mpidr,affinity,running,vendor") + g_arm.add_argument("-a", "--affinity", "--level", "--affinity-level", type=lambda x: int(x, 0), + help="Affinity level (when multiple levels apply)") + g_arm.add_argument("-l", "--mpidr", type=lambda x: int(x, 0), + help="Multiprocessor Affinity Register") + g_arm.add_argument("-i", "--midr", type=lambda x: int(x, 0), + help="Main ID Register") + g_arm.add_argument("-r", "--running", + action=argparse.BooleanOptionalAction, default=None, + help="Indicates if the processor is running or not") + g_arm.add_argument("--psci", "--psci-state", type=lambda x: int(x, 0), + help="Power State Coordination Interface - PSCI state") + + # TODO: Add vendor-specific support + + # UEFI N.17 bitmaps (type and flags) + g_pei = parser.add_argument_group("ARM Processor Error Info (PEI)") + g_pei.add_argument("-t", "--type", nargs="+", + help="one or more error types: cache,tlb,bus,vendor") + g_pei.add_argument("-f", "--flags", nargs="*", + help="zero or more error flags: first-error,last-error,propagated,overflow") + g_arm.add_argument("-V", "--pei-valid", "--error-valid", nargs="*", + help="zero or more PEI validation bits: multiple-error,flags,error-info,virt,phy") + + # UEFI N.17 Integer values + g_pei.add_argument("-m", "--multiple-error", nargs="+", + help="Number of errors: 0: Single error, 1: Multiple errors, 2-65535: Error count if known") + g_pei.add_argument("-e", "--error-info", nargs="+", + help="Error information (UEFI 2.10 tables N.18 to N.20)") + g_pei.add_argument("-p", "--physical-address", nargs="+", + help="Physical address") + g_pei.add_argument("-v", "--virtual-address", nargs="+", + help="Virtual address") + + # UEFI N.21 Context + g_pei.add_argument("--ctx-type", "--context-type", nargs="*", + help="Type of the context (0=ARM32 GPR, 5=ARM64 EL1, other values supported)") + g_pei.add_argument("--ctx-size", "--context-size", nargs="*", + help="Minimal size of the context") + g_pei.add_argument("--ctx-array", "--context-array", nargs="*", + help="Comma-separated arrays for each context") + + # Vendor-specific data + g_pei.add_argument("--vendor", "--vendor-specific", nargs="+", + help="Vendor-specific byte arrays of data") + + return parser.parse_args() + +def main(): + """Main program""" + + args = handle_args() + + einj = errorInjection(args) + einj.run(args.host, args.port) + +if __name__ == '__main__': + main()