import sys, os, re, time, getopt, shlex, inspect, xnudefines import lldb import uuid import base64 import json from importlib import reload from importlib.util import find_spec from functools import wraps from ctypes import c_ulonglong as uint64_t from ctypes import c_void_p as voidptr_t import core from core import caching from core.standard import * from core.configuration import * from core.kernelcore import * from utils import * from core.lazytarget import * MODULE_NAME=__name__ """ Kernel Debugging macros for lldb. Please make sure you read the README COMPLETELY BEFORE reading anything below. It is very critical that you read coding guidelines in Section E in README file. """ COMMON_HELP_STRING = """ -h Show the help string for the command. -c [always|auto|never|0|1] Control the colorized output of certain commands -o The output of this command execution will be saved to file. Parser information or errors will not be sent to file though. eg /tmp/output.txt -s The "filter_string" param is parsed to python regex expression and each line of output will be printed/saved only if it matches the expression. -v [-v...] Each additional -v will increase the verbosity of the command. -p Send the output of the command to plugin. Please see README for usage of plugins. """ # End Utility functions # Debugging specific utility functions #decorators. Not to be called directly. def static_var(var_name, initial_value): def _set_var(obj): setattr(obj, var_name, initial_value) return obj return _set_var def header(initial_value): def _set_header(obj): setattr(obj, 'header', initial_value) return obj return _set_header def md_header(fmt, args): def _set_md_header(obj): header = "|" + "|".join(fmt.split(" ")).format(*args) + "|" colhead = map(lambda fmt, col: "-"*len(fmt.format(col)), fmt.split(" "), args) sub_header = "|" + "|".join(colhead) + "|" setattr(obj, 'markdown', "\n".join([header, sub_header])) return obj return _set_md_header # holds type declarations done by xnu. #DONOTTOUCHME: Exclusive use of lldb_type_summary only. lldb_summary_definitions = {} def lldb_type_summary(types_list): """ A function decorator to register a summary for a type in lldb. params: types_list - [] an array of types that you wish to register a summary callback function. (ex. ['task *', 'task_t']) returns: Nothing. This is a decorator. """ def _get_summary(obj): summary_function_name = "LLDBSummary" + obj.__name__ def _internal_summary_function(lldbval, internal_dict): args, _, _, _ = inspect.getargspec(obj) if 'O' in args: stream = CommandOutput(summary_function_name, fhandle=sys.stdout) with RedirectStdStreams(stdout=stream), caching.ImplicitContext(lldbval): return '\n' + obj.header + '\n' + obj(core.value(lldbval), O=stream) out_string = "" if internal_dict != None and len(obj.header) > 0 : out_string += "\n" + obj.header +"\n" with caching.ImplicitContext(lldbval): out_string += obj(core.value(lldbval)) return out_string myglobals = globals() myglobals[summary_function_name] = _internal_summary_function summary_function = myglobals[summary_function_name] summary_function.__doc__ = obj.__doc__ global lldb_summary_definitions for single_type in types_list: if config['showTypeSummary']: if single_type in lldb_summary_definitions: lldb.debugger.HandleCommand("type summary delete --category kernel \""+ single_type + "\"") lldb.debugger.HandleCommand("type summary add \""+ single_type +"\" --category kernel --python-function " + MODULE_NAME + "." + summary_function_name) lldb_summary_definitions[single_type] = obj return obj return _get_summary # # Exception handling from commands # _LLDB_WARNING = ( "********************* LLDB found an exception *********************\n" "{lldb_version}\n\n" " There has been an uncaught exception.\n" " It could be because the debugger was disconnected.\n" "\n" " In order to debug the macro being run, run the macro again\n" " with the `--debug` flag to get richer information, for example:\n" "\n" " (lldb) showtask --debug 0x1234\n" "\n" " In order to file a bug report instead, run the macro again\n" " with the `--radar` flag which will produce a tarball to\n" " attach to your report, for example:\n" "\n" " (lldb) showtask --radar 0x1234\n" "********************************************************************\n" ) def _format_exc(exc, vt): import traceback, textwrap out_str = "" w = textwrap.TextWrapper(width=100, placeholder="...", max_lines=3) tb = traceback.TracebackException.from_exception(exc, limit=None, lookup_lines=True, capture_locals=True) for frame in tb.stack: out_str += ( f"File \"{vt.DarkBlue}{frame.filename}\"{vt.Magenta}@{frame.lineno}{vt.Reset} " f"in {vt.Bold}{vt.DarkCyan}{frame.name}{vt.Reset}\n" ) out_str += " Locals:\n" for name, value in frame.locals.items(): variable = f" {vt.Bold}{vt.DarkGreen}{name}{vt.Reset} = " first = True for wline in w.wrap(str(value)): if first: out_str += variable + f"{vt.Oblique}{wline}\n" first = False else: out_str += " " * (len(name) + 7) + wline + "\n" out_str += vt.EndOblique out_str += " " + "-" * 100 + "\n" try: src = open(frame.filename, "r") except IOError: out_str += " < Sources not available >\n" else: with src: lines = src.readlines() startline = frame.lineno - 3 if frame.lineno > 2 else 0 endline = min(frame.lineno + 2, len(lines)) for lineno in range(startline, endline): if lineno + 1 == frame.lineno: fmt = vt.Bold + vt.Default marker = '>' else: fmt = vt.Default marker = ' ' out_str += f"{fmt} {marker} {lineno + 1:5} {lines[lineno].rstrip()}{vt.Reset}\n" out_str += " " + "-" * 100 + "\n" out_str += "\n" return out_str _RADAR_URL = "rdar://new/problem?title=LLDB%20macro%20failed%3A%20{}&attachments={}" def diagnostic_report(exc, stream, cmd_name, debug_opts, lldb_log_fname=None): """ Collect diagnostic report for radar submission. @param exc (Exception type) Exception being reported. @param stream (OutputObject) Command's output stream to support formattting. @param cmd_name (str) Name of command being executed. @param debug_opts ([str]) List of active debugging options (--debug, --radar, --pdb) @param lldb_log_fname (str) LLDB log file name to collect (optional) """ # Print prologue common to all exceptions handling modes. print(stream.VT.DarkRed + _LLDB_WARNING.format(lldb_version=lldb.SBDebugger.GetVersionString())) print(stream.VT.Bold + stream.VT.DarkGreen + type(exc).__name__ + stream.VT.Default + ": {}".format(str(exc)) + stream.VT.Reset) print() if not debug_opts: raise exc # # Display enhanced diagnostics when requested. # if "--debug" in debug_opts: # Format exception for terminal print(_format_exc(exc, stream.VT)) print("version:") print(lldb.SBDebugger.GetVersionString()) print() # # Construct tar.gz bundle for radar attachement # if "--radar" in debug_opts: import tarfile, urllib.parse print("Creating radar bundle ...") itime = int(time.time()) tar_fname = "/tmp/debug.{:d}.tar.gz".format(itime) with tarfile.open(tar_fname, "w") as tar: # Collect LLDB log. It can't be unlinked here because it is still used # for the whole duration of xnudebug debug enable. if lldb_log_fname is not None: print(" Adding {}".format(lldb_log_fname)) tar.add(lldb_log_fname, "radar/lldb.log") os.unlink(lldb_log_fname) # Collect traceback tb_fname = "/tmp/tb.{:d}.log".format(itime) print(" Adding {}".format(tb_fname)) with open(tb_fname,"w") as f: f.write(f"{type(exc).__name__}: {str(exc)}\n\n") f.write(_format_exc(exc, NOVT())) f.write("version:\n") f.write(f"{lldb.SBDebugger.GetVersionString()}\n") tar.add(tb_fname, "radar/traceback.log") os.unlink(tb_fname) # Radar submission print() print(stream.VT.DarkRed + "Please attach {} to your radar or open the URL below:".format(tar_fname) + stream.VT.Reset) print() print(" " + _RADAR_URL.format(urllib.parse.quote(cmd_name),urllib.parse.quote(tar_fname))) print() # Enter pdb when requested. if "--pdb" in debug_opts: print("Starting debugger ...") import pdb pdb.post_mortem(exc.__traceback__) return False #global cache of documentation for lldb commands exported by this module #DONOTTOUCHME: Exclusive use of lldb_command only. lldb_command_documentation = {} _DEBUG_OPTS = { "--debug", "--radar", "--pdb" } def lldb_command(cmd_name, option_string = '', fancy=False): """ A function decorator to define a command with name 'cmd_name' in the lldb scope to call python function. params: cmd_name - str : name of command to be set in lldb prompt. option_string - str: getopt like option string. Only CAPITAL LETTER options allowed. see README on Customizing command options. fancy - bool : whether the command will receive an 'O' object to do fancy output (tables, indent, color) """ if option_string != option_string.upper(): raise RuntimeError("Cannot setup command with lowercase option args. %s" % option_string) def _cmd(obj): def _internal_command_function(debugger, command, exe_ctx, result, internal_dict): global config, lldb_run_command_state stream = CommandOutput(cmd_name, result) # need to avoid printing on stdout if called from lldb_run_command. if 'active' in lldb_run_command_state and lldb_run_command_state['active']: debuglog('Running %s from lldb_run_command' % command) else: result.SetImmediateOutputFile(sys.__stdout__) command_args = shlex.split(command) lldb.debugger.HandleCommand('type category disable kernel') def_verbose_level = config['verbosity'] # Filter out debugging arguments and enable logging debug_opts = [opt for opt in command_args if opt in _DEBUG_OPTS] command_args = [opt for opt in command_args if opt not in _DEBUG_OPTS] lldb_log_filename = None if "--radar" in debug_opts: lldb_log_filename = "/tmp/lldb.{:d}.log".format(int(time.time())) lldb_run_command("log enable --file {:s} lldb api".format(lldb_log_filename)) lldb_run_command("log enable --file {:s} gdb-remote packets".format(lldb_log_filename)) lldb_run_command("log enable --file {:s} kdp-remote packets".format(lldb_log_filename)) try: stream.setOptions(command_args, option_string) if stream.verbose_level != 0: config['verbosity'] += stream.verbose_level with RedirectStdStreams(stdout=stream), caching.ImplicitContext(exe_ctx): args = { 'cmd_args': stream.target_cmd_args } if option_string: args['cmd_options'] = stream.target_cmd_options if fancy: args['O'] = stream obj(**args) except KeyboardInterrupt: print("Execution interrupted by user") except ArgumentError as arg_error: if str(arg_error) != "HELP": print("Argument Error: " + str(arg_error)) print("{0:s}:\n {1:s}".format(cmd_name, obj.__doc__.strip())) return False except Exception as exc: if "--radar" in debug_opts: lldb_run_command("log disable") return diagnostic_report(exc, stream, cmd_name, debug_opts, lldb_log_filename) if config['showTypeSummary']: lldb.debugger.HandleCommand('type category enable kernel' ) if stream.pluginRequired : plugin = LoadXNUPlugin(stream.pluginName) if plugin == None : print("Could not load plugins."+stream.pluginName) return plugin.plugin_init(kern, config, lldb, kern.IsDebuggerConnected()) return_data = plugin.plugin_execute(cmd_name, result.GetOutput()) ProcessXNUPluginResult(return_data) plugin.plugin_cleanup() #restore the verbose level after command is complete config['verbosity'] = def_verbose_level return myglobals = globals() command_function_name = obj.__name__+"Command" myglobals[command_function_name] = _internal_command_function command_function = myglobals[command_function_name] if not obj.__doc__ : print("ERROR: Cannot register command({:s}) without documentation".format(cmd_name)) return obj obj.__doc__ += "\n" + COMMON_HELP_STRING command_function.__doc__ = obj.__doc__ global lldb_command_documentation if cmd_name in lldb_command_documentation: lldb.debugger.HandleCommand("command script delete "+cmd_name) lldb_command_documentation[cmd_name] = (obj.__name__, obj.__doc__.lstrip(), option_string) lldb.debugger.HandleCommand("command script add -f " + MODULE_NAME + "." + command_function_name + " " + cmd_name) setattr(obj, 'fancy', fancy) if fancy: def wrapped_fun(cmd_args=None, cmd_options={}, O=None): if O is None: stream = CommandOutput(cmd_name, fhandle=sys.stdout) with RedirectStdStreams(stdout=stream): return obj(cmd_args, cmd_options, O=stream) else: return obj(cmd_args, cmd_options, O) return wrapped_fun return obj return _cmd def lldb_alias(alias_name, cmd_line): """ define an alias in the lldb command line. A programatic way of registering an alias. This basically does (lldb)command alias alias_name "cmd_line" ex. lldb_alias('readphys16', 'readphys 16') """ alias_name = alias_name.strip() cmd_line = cmd_line.strip() lldb.debugger.HandleCommand("command alias " + alias_name + " "+ cmd_line) def SetupLLDBTypeSummaries(reset=False): global lldb_summary_definitions, MODULE_NAME if reset: lldb.debugger.HandleCommand("type category delete kernel ") for single_type in list(lldb_summary_definitions.keys()): summary_function = lldb_summary_definitions[single_type] lldb_cmd = "type summary add \""+ single_type +"\" --category kernel --python-function " + MODULE_NAME + ".LLDBSummary" + summary_function.__name__ debuglog(lldb_cmd) lldb.debugger.HandleCommand(lldb_cmd) if config['showTypeSummary']: lldb.debugger.HandleCommand("type category enable kernel") else: lldb.debugger.HandleCommand("type category disable kernel") return def LoadXNUPlugin(name): """ Try to load a plugin from the plugins directory. """ retval = None name=name.strip() try: module_obj = __import__('plugins.'+name, globals(), locals(), [], -1) module_obj = module_obj.__dict__[name] defs = dir(module_obj) if 'plugin_init' in defs and 'plugin_execute' in defs and 'plugin_cleanup' in defs: retval = module_obj else: print("Plugin is not correctly implemented. Please read documentation on implementing plugins") except: print("plugin not found :"+name) return retval def ProcessXNUPluginResult(result_data): """ Look at the returned data from plugin and see if anymore actions are required or not params: result_data - list of format (status, out_string, more_commands) """ ret_status = result_data[0] ret_string = result_data[1] ret_commands = result_data[2] if not ret_status: print("Plugin failed: " + ret_string) return print(ret_string) if len(ret_commands) >= 0: for cmd in ret_commands: print("Running command on behalf of plugin:" + cmd) lldb.debugger.HandleCommand(cmd) return # holds tests registered with xnu. #DONOTTOUCHME: Exclusive use of xnudebug_test only lldb_command_tests = {} def xnudebug_test(test_name): """ A function decoratore to register a test with the framework. Each test is supposed to be of format def Test(kernel_target, config, lldb_obj, isConnected ) NOTE: The testname should start with "Test" else exception will be raised. """ def _test(obj): global lldb_command_tests if obj.__name__.find("Test") != 0 : print("Test name ", obj.__name__ , " should start with Test") raise ValueError lldb_command_tests[test_name] = (test_name, obj.__name__, obj, obj.__doc__) return obj return _test # End Debugging specific utility functions # Kernel Debugging specific classes and accessor methods # global access object for target kernel def GetObjectAtIndexFromArray(array_base, index): """ Subscript indexing for arrays that are represented in C as pointers. for ex. int *arr = malloc(20*sizeof(int)); now to get 3rd int from 'arr' you'd do arr[2] in C GetObjectAtIndexFromArray(arr_val,2) params: array_base : core.value - representing a pointer type (ex. base of type 'ipc_entry *') index : int - 0 based index into the array returns: core.value : core.value of the same type as array_base_val but pointing to index'th element """ array_base_val = array_base.GetSBValue() base_address = array_base_val.GetValueAsUnsigned() size = array_base_val.GetType().GetPointeeType().GetByteSize() obj_address = base_address + (index * size) obj = kern.GetValueFromAddress(obj_address, array_base_val.GetType().name) return Cast(obj, array_base_val.GetType()) kern: KernelTarget = None def GetLLDBThreadForKernelThread(thread_obj): """ Get a reference to lldb.SBThread representation for kernel thread. params: thread_obj : core.cvalue - thread object of type thread_t returns lldb.SBThread - lldb thread object for getting backtrace/registers etc. """ tid = unsigned(thread_obj.thread_id) lldb_process = LazyTarget.GetProcess() sbthread = lldb_process.GetThreadByID(tid) if not sbthread.IsValid(): # in case lldb doesnt know about this thread, create one if hasattr(lldb_process, "CreateOSPluginThread"): debuglog("creating os plugin thread on the fly for {0:d} 0x{1:x}".format(tid, thread_obj)) lldb_process.CreateOSPluginThread(tid, unsigned(thread_obj)) else: raise RuntimeError("LLDB process does not support CreateOSPluginThread.") sbthread = lldb_process.GetThreadByID(tid) if not sbthread.IsValid(): raise RuntimeError("Unable to find lldb thread for tid={0:d} thread = {1:#018x} (#16049947: have you put 'settings set target.load-script-from-symbol-file true' in your .lldbinit?)".format(tid, thread_obj)) return sbthread def GetKextSymbolInfo(load_addr): """ Get a string descriptiong load_addr + offset params: load_addr - int address value of pc in backtrace. returns: str - kext name + offset string. If no cached data available, warning message is returned. """ symbol_name = "None" symbol_offset = load_addr kmod_val = kern.globals.kmod if not kern.arch.startswith('arm64'): for kval in IterateLinkedList(kmod_val, 'next'): if load_addr >= unsigned(kval.address) and \ load_addr <= (unsigned(kval.address) + unsigned(kval.size)): symbol_name = kval.name symbol_offset = load_addr - unsigned(kval.address) break return "{:#018x} {:s} + {:#x} \n".format(load_addr, symbol_name, symbol_offset) # only for arm64 we do lookup for split kexts. if not GetAllKextSummaries.cached(): if str(GetConnectionProtocol()) != "core": return "{:#018x} ~ kext info not available. please run 'showallkexts' once ~ \n".format(load_addr) for kval in GetAllKextSummaries(): text_seg = text_segment(kval.segments) if load_addr >= text_seg.vmaddr and \ load_addr <= (text_seg.vmaddr + text_seg.vmsize): symbol_name = kval.name symbol_offset = load_addr - text_seg.vmaddr break return "{:#018x} {:s} + {:#x} \n".format(load_addr, symbol_name, symbol_offset) def GetThreadBackTrace(thread_obj, verbosity = vHUMAN, prefix = ""): """ Get a string to display back trace for a thread. params: thread_obj - core.cvalue : a thread object of type thread_t. verbosity - int : either of vHUMAN, vSCRIPT or vDETAIL to describe the verbosity of output prefix - str : a string prefix added before the line for each frame. isContinuation - bool : is thread a continuation? returns: str - a multi line string showing each frame in backtrace. """ is_continuation = not bool(unsigned(thread_obj.kernel_stack)) thread_val = GetLLDBThreadForKernelThread(thread_obj) out_string = "" kernel_stack = unsigned(thread_obj.kernel_stack) reserved_stack = unsigned(thread_obj.reserved_stack) if not is_continuation: if kernel_stack and reserved_stack: out_string += prefix + "reserved_stack = {:#018x}\n".format(reserved_stack) out_string += prefix + "kernel_stack = {:#018x}\n".format(kernel_stack) else: out_string += prefix + "continuation =" iteration = 0 last_frame_p = 0 for frame in thread_val.frames: addr = frame.GetPCAddress() load_addr = addr.GetLoadAddress(LazyTarget.GetTarget()) function = frame.GetFunction() frame_p = frame.GetFP() mod_name = frame.GetModule().GetFileSpec().GetFilename() if iteration == 0 and not is_continuation: out_string += prefix +"stacktop = {:#018x}\n".format(frame_p) if not function: # No debug info for 'function'. out_string += prefix if not is_continuation: out_string += "{fp:#018x} ".format(fp = frame_p) symbol = frame.GetSymbol() if not symbol: out_string += GetKextSymbolInfo(load_addr) else: file_addr = addr.GetFileAddress() start_addr = symbol.GetStartAddress().GetFileAddress() symbol_name = symbol.GetName() symbol_offset = file_addr - start_addr out_string += "{addr:#018x} {mod}`{symbol} + {offset:#x} \n".format(addr=load_addr, mod=mod_name, symbol=symbol_name, offset=symbol_offset) else: # Debug info is available for 'function'. func_name = frame.GetFunctionName() # file_name = frame.GetLineEntry().GetFileSpec().GetFilename() # line_num = frame.GetLineEntry().GetLine() func_name = '%s [inlined]' % func_name if frame.IsInlined() else func_name if is_continuation and frame.IsInlined(): debuglog("Skipping frame for thread {:#018x} since its inlined".format(thread_obj)) continue out_string += prefix if not is_continuation: out_string += "{fp:#018x} ".format(fp=frame_p) if len(frame.arguments) > 0: strargs = "(" + str(frame.arguments).replace('\n', ', ') + ")" out_string += "{addr:#018x} {func}{args} \n".format( addr=load_addr, func=func_name, args=strargs) else: out_string += "{addr:#018x} {func}(void) \n".format( addr=load_addr, func=func_name) iteration += 1 if frame_p: last_frame_p = frame_p if not is_continuation and last_frame_p: out_string += prefix + "stackbottom = {:#018x}".format(last_frame_p) out_string = out_string.replace("variable not available","") return out_string def GetSourceInformationForAddress(addr): """ convert and address to function +offset information. params: addr - int address in the binary to be symbolicated returns: string of format "0xaddress: function + offset" """ try: return str(kern.SymbolicateFromAddress(addr, fullSymbol=True)[0]) except: return '{0:<#x} '.format(addr) def GetFrameLocalVariable(variable_name, frame_no=0): """ Find a local variable by name params: variable_name: str - name of variable to search for returns: core.value - if the variable is found. None - if not found or not Valid """ retval = None sbval = None lldb_SBThread = LazyTarget.GetProcess().GetSelectedThread() frame = lldb_SBThread.GetSelectedFrame() if frame_no : frame = lldb_SBThread.GetFrameAtIndex(frame_no) if frame : sbval = frame.FindVariable(variable_name) if sbval and sbval.IsValid(): retval = core.cvalue.value(sbval) return retval # Begin Macros for kernel debugging @lldb_command('kgmhelp') def KernelDebugCommandsHelp(cmd_args=None): """ Show a list of registered commands for kenel debugging. """ global lldb_command_documentation print("List of commands provided by " + MODULE_NAME + " for kernel debugging.") cmds = list(lldb_command_documentation.keys()) cmds.sort() for cmd in cmds: if isinstance(lldb_command_documentation[cmd][-1], str): print(" {0: <20s} - {1}".format(cmd , lldb_command_documentation[cmd][1].split("\n")[0].strip())) else: print(" {0: <20s} - {1}".format(cmd , "No help string found.")) print('Each of the functions listed here accept the following common options. ') print(COMMON_HELP_STRING) print('Additionally, each command implementation may have more options. "(lldb) help " will show these options.') return None @lldb_command('showraw') def ShowRawCommand(cmd_args=None): """ A command to disable the kernel summaries and show data as seen by the system. This is useful when trying to read every field of a struct as compared to brief summary """ command = " ".join(cmd_args) lldb.debugger.HandleCommand('type category disable kernel' ) lldb.debugger.HandleCommand(command) lldb.debugger.HandleCommand('type category enable kernel' ) @lldb_command('xnudebug') def XnuDebugCommand(cmd_args=None): """ command interface for operating on the xnu macros. Allowed commands are as follows reload: Reload a submodule from the xnu/tools/lldb directory. Do not include the ".py" suffix in modulename. usage: xnudebug reload (eg. memory, process, stats etc) flushcache: remove any cached data held in static or dynamic data cache. usage: xnudebug flushcache test: Start running registered test with from various modules. usage: xnudebug test (eg. test_memstats) testall: Go through all registered tests and run them debug: Toggle state of debug configuration flag. profile: Profile an lldb command and write its profile info to a file. usage: xnudebug profile e.g. `xnudebug profile /tmp/showallstacks_profile.prof showallstacks coverage: Collect coverage for an lldb command and save it to a file. usage: xnudebug coverage e.g. `xnudebug coverage /tmp/showallstacks_coverage.cov showallstacks` An HTML report can then be generated via `coverage html --data-file=` """ global config command_args = cmd_args if len(command_args) == 0: raise ArgumentError("No command specified.") supported_subcommands = ['debug', 'reload', 'test', 'testall', 'flushcache', 'profile', 'coverage'] subcommand = GetLongestMatchOption(command_args[0], supported_subcommands, True) if len(subcommand) == 0: raise ArgumentError("Subcommand (%s) is not a valid command. " % str(command_args[0])) subcommand = subcommand[0].lower() if subcommand == 'debug': if command_args[-1].lower().find('dis') >=0 and config['debug']: config['debug'] = False print("Disabled debug logging.") elif command_args[-1].lower().find('dis') < 0 and not config['debug']: config['debug'] = True EnableLLDBAPILogging() # provided by utils.py print("Enabled debug logging. \nPlease run 'xnudebug debug disable' to disable it again. ") if subcommand == 'flushcache': print("Current size of cache: {}".format(caching.GetSizeOfCache())) caching.ClearAllCache() if subcommand == 'reload': module_name = command_args[-1] if module_name in sys.modules: reload(sys.modules[module_name]) print(module_name + " is reloaded from " + sys.modules[module_name].__file__) else: print("Unable to locate module named ", module_name) if subcommand == 'testall': for test_name in list(lldb_command_tests.keys()): print("[BEGIN]", test_name) res = lldb_command_tests[test_name][2](kern, config, lldb, True) if res: print("[PASSED] {:s}".format(test_name)) else: print("[FAILED] {:s}".format(test_name)) if subcommand == 'test': test_name = command_args[-1] if test_name in lldb_command_tests: test = lldb_command_tests[test_name] print("Running test {:s}".format(test[0])) if test[2](kern, config, lldb, True) : print("[PASSED] {:s}".format(test[0])) else: print("[FAILED] {:s}".format(test[0])) return "" else: print("No such test registered with name: {:s}".format(test_name)) print("XNUDEBUG Available tests are:") for i in list(lldb_command_tests.keys()): print(i) return None if subcommand == 'profile': save_path = command_args[1] import cProfile, pstats, io pr = cProfile.Profile() pr.enable() lldb.debugger.HandleCommand(" ".join(command_args[2:])) pr.disable() pr.dump_stats(save_path) print("") print("=" * 80) print("") s = io.StringIO() ps = pstats.Stats(pr, stream=s) ps.strip_dirs() ps.sort_stats('cumulative') ps.print_stats(30) print(s.getvalue().rstrip()) print("") print(f"Profile info saved to \"{save_path}\"") if subcommand == 'coverage': coverage_module = find_spec('coverage') if not coverage_module: print("Missing 'coverage' module. Please install it for the interpreter currently running.`") return save_path = command_args[1] import coverage cov = coverage.Coverage(data_file=save_path) cov.start() lldb.debugger.HandleCommand(" ".join(command_args[2:])) cov.stop() cov.save() print(cov.report()) print(f"Coverage info saved to: \"{save_path}\"") return False @lldb_command('showversion') def ShowVersion(cmd_args=None): """ Read the kernel version string from a fixed address in low memory. Useful if you don't know which kernel is on the other end, and need to find the appropriate symbols. Beware that if you've loaded a symbol file, but aren't connected to a remote target, the version string from the symbol file will be displayed instead. This macro expects to be connected to the remote kernel to function correctly. """ print(kern.version) def ProcessPanicStackshot(panic_stackshot_addr, panic_stackshot_len, cmd_options): """ Process the panic stackshot from the panic header, saving it to a file if it is valid params: panic_stackshot_addr : start address of the panic stackshot binary data panic_stackshot_len : length of the stackshot binary data returns: nothing """ if not panic_stackshot_addr: print("No panic stackshot available (invalid addr)") return if not panic_stackshot_len: print("No panic stackshot available (zero length)") return; if "-D" in cmd_options: dir_ = cmd_options["-D"] if os.path.exists(dir_): if not os.access(dir_, os.W_OK): print("Write access to {} denied".format(dir_)) return else: try: os.makedirs(dir_) except OSError as e: print("An error occurred {} while creating a folder : {}".format(e, dir_)) return else: dir_ = "/tmp" id = str(uuid.uuid4())[:8] ss_binfile = os.path.join(dir_, "panic_%s.bin" % id) ss_ipsfile = os.path.join(dir_, "panic_%s.ips" % id) if not SaveDataToFile(panic_stackshot_addr, panic_stackshot_len, ss_binfile, None): print("Failed to save stackshot binary data to file") return from kcdata import decode_kcdata_file try: with open(ss_binfile, "rb") as binfile: decode_kcdata_file(binfile, ss_ipsfile) print("Saved ips stackshot file as %s" % ss_ipsfile) except Exception as e: print("Failed to decode the stackshot: %s" % str(e)) def ParseEmbeddedPanicLog(panic_header, cmd_options={}): panic_buf = Cast(panic_header, 'char *') panic_log_magic = unsigned(panic_header.eph_magic) panic_log_begin_offset = unsigned(panic_header.eph_panic_log_offset) panic_log_len = unsigned(panic_header.eph_panic_log_len) other_log_begin_offset = unsigned(panic_header.eph_other_log_offset) other_log_len = unsigned(panic_header.eph_other_log_len) expected_panic_magic = xnudefines.EMBEDDED_PANIC_MAGIC panic_stackshot_addr = unsigned(panic_header) + unsigned(panic_header.eph_stackshot_offset) panic_stackshot_len = unsigned(panic_header.eph_stackshot_len) panic_header_flags = unsigned(panic_header.eph_panic_flags) warn_str = "" out_str = "" if panic_log_magic != 0 and panic_log_magic != expected_panic_magic: warn_str += "BAD MAGIC! Found 0x%x expected 0x%x" % (panic_log_magic, expected_panic_magic) if warn_str: print("\n %s" % warn_str) if panic_log_begin_offset == 0: return if "-S" in cmd_options: if panic_header_flags & xnudefines.EMBEDDED_PANIC_STACKSHOT_SUCCEEDED_FLAG: ProcessPanicStackshot(panic_stackshot_addr, panic_stackshot_len, cmd_options) else: print("No panic stackshot available") elif "-D" in cmd_options: print("-D option must be specified along with the -S option") return panic_log_curindex = 0 while panic_log_curindex < panic_log_len: p_char = str(panic_buf[(panic_log_begin_offset + panic_log_curindex)]) out_str += p_char panic_log_curindex += 1 if other_log_begin_offset != 0: other_log_curindex = 0 while other_log_curindex < other_log_len: p_char = str(panic_buf[(other_log_begin_offset + other_log_curindex)]) out_str += p_char other_log_curindex += 1 print(out_str) return def ParseMacOSPanicLog(panic_header, cmd_options={}): panic_buf = Cast(panic_header, 'char *') panic_log_magic = unsigned(panic_header.mph_magic) panic_log_begin_offset = unsigned(panic_header.mph_panic_log_offset) panic_log_len = unsigned(panic_header.mph_panic_log_len) other_log_begin_offset = unsigned(panic_header.mph_other_log_offset) other_log_len = unsigned(panic_header.mph_other_log_len) cur_debug_buf_ptr_offset = (unsigned(kern.globals.debug_buf_ptr) - unsigned(panic_header)) if other_log_begin_offset != 0 and (other_log_len == 0 or other_log_len < (cur_debug_buf_ptr_offset - other_log_begin_offset)): other_log_len = cur_debug_buf_ptr_offset - other_log_begin_offset expected_panic_magic = xnudefines.MACOS_PANIC_MAGIC # use the global if it's available (on an x86 corefile), otherwise refer to the header if hasattr(kern.globals, "panic_stackshot_buf"): panic_stackshot_addr = unsigned(kern.globals.panic_stackshot_buf) panic_stackshot_len = unsigned(kern.globals.panic_stackshot_len) else: panic_stackshot_addr = unsigned(panic_header) + unsigned(panic_header.mph_stackshot_offset) panic_stackshot_len = unsigned(panic_header.mph_stackshot_len) panic_header_flags = unsigned(panic_header.mph_panic_flags) warn_str = "" out_str = "" if panic_log_magic != 0 and panic_log_magic != expected_panic_magic: warn_str += "BAD MAGIC! Found 0x%x expected 0x%x" % (panic_log_magic, expected_panic_magic) if warn_str: print("\n %s" % warn_str) if panic_log_begin_offset == 0: return if "-S" in cmd_options: if panic_header_flags & xnudefines.MACOS_PANIC_STACKSHOT_SUCCEEDED_FLAG: ProcessPanicStackshot(panic_stackshot_addr, panic_stackshot_len, cmd_options) else: print("No panic stackshot available") elif "-D" in cmd_options: print("-D option must be specified along with the -S option") return panic_log_curindex = 0 while panic_log_curindex < panic_log_len: p_char = str(panic_buf[(panic_log_begin_offset + panic_log_curindex)]) out_str += p_char panic_log_curindex += 1 if other_log_begin_offset != 0: other_log_curindex = 0 while other_log_curindex < other_log_len: p_char = str(panic_buf[(other_log_begin_offset + other_log_curindex)]) out_str += p_char other_log_curindex += 1 print(out_str) return def ParseAURRPanicLog(panic_header, cmd_options={}): reset_cause = { 0x0: "OTHER", 0x1: "CATERR", 0x2: "SWD_TIMEOUT", 0x3: "GLOBAL RESET", 0x4: "STRAIGHT TO S5", } expected_panic_magic = xnudefines.AURR_PANIC_MAGIC panic_buf = Cast(panic_header, 'char *') try: # This line will blow up if there's not type info for this struct (older kernel) # We fall back to manual parsing below aurr_panic_header = Cast(panic_header, 'struct efi_aurr_panic_header *') panic_log_magic = unsigned(aurr_panic_header.efi_aurr_magic) panic_log_version = unsigned(aurr_panic_header.efi_aurr_version) panic_log_reset_cause = unsigned(aurr_panic_header.efi_aurr_reset_cause) panic_log_reset_log_offset = unsigned(aurr_panic_header.efi_aurr_reset_log_offset) panic_log_reset_log_len = unsigned(aurr_panic_header.efi_aurr_reset_log_len) except Exception as e: print("*** Warning: kernel symbol file has no type information for 'struct efi_aurr_panic_header'...") print("*** Warning: trying to manually parse...") aurr_panic_header = Cast(panic_header, "uint32_t *") panic_log_magic = unsigned(aurr_panic_header[0]) # panic_log_crc = unsigned(aurr_panic_header[1]) panic_log_version = unsigned(aurr_panic_header[2]) panic_log_reset_cause = unsigned(aurr_panic_header[3]) panic_log_reset_log_offset = unsigned(aurr_panic_header[4]) panic_log_reset_log_len = unsigned(aurr_panic_header[5]) if panic_log_magic != 0 and panic_log_magic != expected_panic_magic: print("BAD MAGIC! Found 0x%x expected 0x%x" % (panic_log_magic, expected_panic_magic)) return print("AURR Panic Version: %d" % (panic_log_version)) # When it comes time to extend this in the future, please follow the # construct used below in ShowPanicLog() if panic_log_version in (xnudefines.AURR_PANIC_VERSION, xnudefines.AURR_CRASHLOG_PANIC_VERSION): # AURR Report Version 1 (AURR/MacEFI) or 2 (Crashlog) # see macefifirmware/Vendor/Apple/EfiPkg/AppleDebugSupport/Library/Debugger.h print("Reset Cause: 0x%x (%s)" % (panic_log_reset_cause, reset_cause.get(panic_log_reset_cause, "UNKNOWN"))) # Adjust panic log string length (cap to maximum supported values) if panic_log_version == xnudefines.AURR_PANIC_VERSION: max_string_len = panic_log_reset_log_len elif panic_log_version == xnudefines.AURR_CRASHLOG_PANIC_VERSION: max_string_len = xnudefines.CRASHLOG_PANIC_STRING_LEN panic_str_offset = 0 out_str = "" while panic_str_offset < max_string_len: p_char = str(panic_buf[panic_log_reset_log_offset + panic_str_offset]) out_str += p_char panic_str_offset += 1 print(out_str) # Save Crashlog Binary Data (if available) if "-S" in cmd_options and panic_log_version == xnudefines.AURR_CRASHLOG_PANIC_VERSION: crashlog_binary_offset = panic_log_reset_log_offset + xnudefines.CRASHLOG_PANIC_STRING_LEN crashlog_binary_size = (panic_log_reset_log_len > xnudefines.CRASHLOG_PANIC_STRING_LEN) and (panic_log_reset_log_len - xnudefines.CRASHLOG_PANIC_STRING_LEN) or 0 if 0 == crashlog_binary_size: print("No crashlog data found...") return # Save to file ts = int(time.time()) ss_binfile = "/tmp/crashlog_%d.bin" % ts if not SaveDataToFile(panic_buf + crashlog_binary_offset, crashlog_binary_size, ss_binfile, None): print("Failed to save crashlog binary data to file") return else: return ParseUnknownPanicLog(panic_header, cmd_options) return def ParseUnknownPanicLog(panic_header, cmd_options={}): magic_ptr = Cast(panic_header, 'uint32_t *') panic_log_magic = dereference(magic_ptr) print("Unrecognized panic header format. Magic: 0x%x..." % unsigned(panic_log_magic)) print("Panic region starts at 0x%08x" % int(panic_header)) print("Hint: To dump this panic header in order to try manually parsing it, use this command:") print(" (lldb) memory read -fx -s4 -c64 0x%08x" % int(panic_header)) print(" ^ that will dump the first 256 bytes of the panic region") ## TBD: Hexdump some bits here to allow folks to poke at the region manually? return @lldb_command('paniclog', 'SMD:') def ShowPanicLog(cmd_args=None, cmd_options={}): """ Display the paniclog information usage: (lldb) paniclog options: -v : increase verbosity -S : parse stackshot data (if panic stackshot available) -D : Takes a folder name for stackshot. This must be specified along with the -S option. -M : parse macOS panic area (print panic string (if available), and/or capture crashlog info) -E : Takes a file name and redirects the ext paniclog output to the file """ if "-M" in cmd_options: if not hasattr(kern.globals, "mac_panic_header"): print("macOS panic data requested but unavailable on this device") return panic_header = kern.globals.mac_panic_header # DEBUG HACK FOR TESTING #panic_header = kern.GetValueFromAddress(0xfffffff054098000, "uint32_t *") else: panic_header = kern.globals.panic_info if hasattr(panic_header, "eph_magic"): panic_log_magic = unsigned(panic_header.eph_magic) elif hasattr(panic_header, "mph_magic"): panic_log_magic = unsigned(panic_header.mph_magic) else: print("*** Warning: unsure of panic header format, trying anyway") magic_ptr = Cast(panic_header, 'uint32_t *') panic_log_magic = int(dereference(magic_ptr)) if panic_log_magic == 0: # No panic here.. return panic_parsers = { int(xnudefines.AURR_PANIC_MAGIC) : ParseAURRPanicLog, int(xnudefines.MACOS_PANIC_MAGIC) : ParseMacOSPanicLog, int(xnudefines.EMBEDDED_PANIC_MAGIC) : ParseEmbeddedPanicLog, } # Find the right parser (fall back to unknown parser above) parser = panic_parsers.get(panic_log_magic, ParseUnknownPanicLog) # execute it return parser(panic_header, cmd_options) @lldb_command('extpaniclog', 'F:') def ProcessExtensiblePaniclog(cmd_args=None, cmd_options={}): """ Write the extensible paniclog information to a file usage: (lldb) paniclog options: -F : Output file name """ if not "-F" in cmd_options: print("Output file name is needed: Use -F") return panic_header = kern.globals.panic_info process = LazyTarget().GetProcess() error = lldb.SBError() EXT_PANICLOG_MAX_SIZE = 32 # 32 is the max size of the string in Data ID ext_paniclog_len = unsigned(panic_header.eph_ext_paniclog_len) if ext_paniclog_len == 0: print("Cannot find extensible paniclog") return ext_paniclog_addr = unsigned(panic_header) + unsigned(panic_header.eph_ext_paniclog_offset) ext_paniclog_bytes = process.chkReadMemory(ext_paniclog_addr, ext_paniclog_len); idx = 0; ext_paniclog_ver_bytes = ext_paniclog_bytes[idx:idx+sizeof('uint32_t')] ext_paniclog_ver = int.from_bytes(ext_paniclog_ver_bytes, 'little') idx += sizeof('uint32_t') no_of_logs_bytes = ext_paniclog_bytes[idx:idx+sizeof('uint32_t')] no_of_logs = int.from_bytes(no_of_logs_bytes, 'little') idx += sizeof('uint32_t') ext_paniclog = dict() logs_processed = 0 for _ in range(no_of_logs): uuid_bytes = ext_paniclog_bytes[idx:idx+sizeof('uuid_t')] ext_uuid = str(uuid.UUID(bytes=uuid_bytes)) idx += sizeof('uuid_t') flags_bytes = ext_paniclog_bytes[idx:idx+sizeof('uint32_t')] flags = int.from_bytes(flags_bytes, 'little') idx += sizeof('ext_paniclog_flags_t') data_id_bytes = ext_paniclog_bytes[idx:idx + EXT_PANICLOG_MAX_SIZE].split(b'\0')[0] data_id = data_id_bytes.decode('utf-8') data_id_len = len(data_id_bytes) idx += data_id_len + 1 data_len_bytes = ext_paniclog_bytes[idx:idx+sizeof('uint32_t')] data_len = int.from_bytes(data_len_bytes, 'little') idx += sizeof('uint32_t') data_bytes = ext_paniclog_bytes[idx:idx+data_len] data = base64.b64encode(data_bytes).decode('ascii') idx += data_len temp_dict = dict(Data_Id=data_id, Data=data) ext_paniclog.setdefault(ext_uuid, []).append(temp_dict) logs_processed += 1 if logs_processed < no_of_logs: print("** Warning: Extensible paniclog might be corrupted **") with open(cmd_options['-F'], 'w') as out_file: out_file.write(json.dumps(ext_paniclog)) print("Wrote extensible paniclog to %s" % cmd_options['-F']) return @lldb_command('showbootargs') def ShowBootArgs(cmd_args=None): """ Display boot arguments passed to the target kernel """ bootargs = Cast(kern.GetGlobalVariable('PE_state').bootArgs, 'boot_args *') bootargs_cmd = bootargs.CommandLine print(str(bootargs_cmd)) # The initialization code to add your commands _xnu_framework_init = False def __lldb_init_module(debugger, internal_dict): global kern, lldb_command_documentation, config, _xnu_framework_init if _xnu_framework_init: return _xnu_framework_init = True debugger.HandleCommand('type summary add --regex --summary-string "${var%s}" -C yes -p -v "char *\[[0-9]*\]"') debugger.HandleCommand('type format add --format hex -C yes uintptr_t') debugger.HandleCommand('type format add --format hex -C yes cpumap_t') kern = KernelTarget(debugger) if not hasattr(lldb.SBValue, 'GetValueAsAddress'): warn_str = "WARNING: lldb version is too old. Some commands may break. Please update to latest lldb." if os.isatty(sys.__stdout__.fileno()): warn_str = VT.DarkRed + warn_str + VT.Default print(warn_str) print("xnu debug macros loaded successfully. Run showlldbtypesummaries to enable type summaries.") __lldb_init_module(lldb.debugger, None) @lldb_command("showlldbtypesummaries") def ShowLLDBTypeSummaries(cmd_args=[]): """ Enable/Disable kernel type summaries. Default is disabled. Usage: showlldbtypesummaries [enable|disable] default is enable """ global config action = "enable" trailer_msg = '' if len(cmd_args) > 0 and cmd_args[0].lower().find('disable') >=0: action = "disable" config['showTypeSummary'] = False trailer_msg = "Please run 'showlldbtypesummaries enable' to enable the summary feature." else: config['showTypeSummary'] = True SetupLLDBTypeSummaries(True) trailer_msg = "Please run 'showlldbtypesummaries disable' to disable the summary feature." lldb_run_command("type category "+ action +" kernel") print("Successfully "+action+"d the kernel type summaries. %s" % trailer_msg) @lldb_command('walkqueue_head', 'S') def WalkQueueHead(cmd_args=[], cmd_options={}): """ walk a queue_head_t and list all members in it. Note this is for queue_head_t. refer to osfmk/kern/queue.h Option: -S - suppress summary output. Usage: (lldb) walkqueue_head ex: (lldb) walkqueue_head 0x7fffff80 "thread *" "task_threads" """ global lldb_summary_definitions if not cmd_args: raise ArgumentError("invalid arguments") if len(cmd_args) != 3: raise ArgumentError("insufficient arguments") queue_head = kern.GetValueFromAddress(cmd_args[0], 'struct queue_entry *') el_type = cmd_args[1] field_name = cmd_args[2] showsummary = False if el_type in lldb_summary_definitions: showsummary = True if '-S' in cmd_options: showsummary = False for i in IterateQueue(queue_head, el_type, field_name): if showsummary: print(lldb_summary_definitions[el_type](i)) else: print("{0: <#020x}".format(i)) @lldb_command('walklist_entry', 'SE') def WalkList(cmd_args=[], cmd_options={}): """ iterate over a list as defined with LIST_ENTRY in bsd/sys/queue.h params: object addr - value : address of object element_type - str : Type of the next element field_name - str : Name of the field in next element's structure Options: -S - suppress summary output. -E - Iterate using SLIST_ENTRYs Usage: (lldb) walklist_entry ex: (lldb) walklist_entry 0x7fffff80 "struct proc *" "p_sibling" """ global lldb_summary_definitions if not cmd_args: raise ArgumentError("invalid arguments") if len(cmd_args) != 3: raise ArgumentError("insufficient arguments") el_type = cmd_args[1] queue_head = kern.GetValueFromAddress(cmd_args[0], el_type) field_name = cmd_args[2] showsummary = False if el_type in lldb_summary_definitions: showsummary = True if '-S' in cmd_options: showsummary = False if '-E' in cmd_options: prefix = 's' else: prefix = '' elt = queue_head while unsigned(elt) != 0: i = elt elt = elt.__getattr__(field_name).__getattr__(prefix + 'le_next') if showsummary: print(lldb_summary_definitions[el_type](i)) else: print("{0: <#020x}".format(i)) def trace_parse_Copt(Copt): """Parses the -C option argument and returns a list of CPUs """ cpusOpt = Copt cpuList = cpusOpt.split(",") chosen_cpus = [] for cpu_num_string in cpuList: try: if '-' in cpu_num_string: parts = cpu_num_string.split('-') if len(parts) != 2 or not (parts[0].isdigit() and parts[1].isdigit()): raise ArgumentError("Invalid cpu specification: %s" % cpu_num_string) firstRange = int(parts[0]) lastRange = int(parts[1]) if firstRange >= kern.globals.real_ncpus or lastRange >= kern.globals.real_ncpus: raise ValueError() if lastRange < firstRange: raise ArgumentError("Invalid CPU range specified: `%s'" % cpu_num_string) for cpu_num in range(firstRange, lastRange + 1): if cpu_num not in chosen_cpus: chosen_cpus.append(cpu_num) else: chosen_cpu = int(cpu_num_string) if chosen_cpu < 0 or chosen_cpu >= kern.globals.real_ncpus: raise ValueError() if chosen_cpu not in chosen_cpus: chosen_cpus.append(chosen_cpu) except ValueError: raise ArgumentError("Invalid CPU number specified. Valid range is 0..%d" % (kern.globals.real_ncpus - 1)) return chosen_cpus IDX_CPU = 0 IDX_RINGPOS = 1 IDX_RINGENTRY = 2 def Trace_cmd(cmd_args=[], cmd_options={}, headerString=lambda:"", entryString=lambda x:"", ring='', entries_per_cpu=0, max_backtraces=0): """Generic trace dumper helper function """ if '-S' in cmd_options: field_arg = cmd_options['-S'] try: getattr(kern.PERCPU_GET(ring, 0)[0], field_arg) sort_key_field_name = field_arg except AttributeError: raise ArgumentError("Invalid sort key field name `%s'" % field_arg) else: sort_key_field_name = 'start_time_abs' if '-C' in cmd_options: chosen_cpus = trace_parse_Copt(cmd_options['-C']) else: chosen_cpus = [x for x in range(kern.globals.real_ncpus)] try: limit_output_count = int(cmd_options['-N']) except ValueError: raise ArgumentError("Invalid output count `%s'" % cmd_options['-N']); except KeyError: limit_output_count = None reverse_sort = '-R' in cmd_options backtraces = '-B' in cmd_options # entries will be a list of 3-tuples, each holding the CPU on which the iotrace entry was collected, # the original ring index, and the iotrace entry. entries = [] for x in chosen_cpus: ring_slice = [(x, y, kern.PERCPU_GET(ring, x)[y]) for y in range(entries_per_cpu)] entries.extend(ring_slice) total_entries = len(entries) entries.sort(key=lambda x: getattr(x[IDX_RINGENTRY], sort_key_field_name), reverse=reverse_sort) if limit_output_count is not None and limit_output_count > total_entries: print ("NOTE: Output count `%d' is too large; showing all %d entries" % (limit_output_count, total_entries)); limit_output_count = total_entries if len(chosen_cpus) < kern.globals.real_ncpus: print("NOTE: Limiting to entries from cpu%s %s" % ("s" if len(chosen_cpus) > 1 else "", str(chosen_cpus))) if limit_output_count is not None and limit_output_count < total_entries: entries_to_display = limit_output_count print("NOTE: Limiting to the %s" % ("first entry" if entries_to_display == 1 else ("first %d entries" % entries_to_display))) else: entries_to_display = total_entries print(headerString()) for x in range(entries_to_display): print(entryString(entries[x])) if backtraces: for btidx in range(max_backtraces): nextbt = entries[x][IDX_RINGENTRY].backtrace[btidx] if nextbt == 0: break print("\t" + GetSourceInformationForAddress(nextbt)) @lldb_command('iotrace', 'C:N:S:RB') def IOTrace_cmd(cmd_args=[], cmd_options={}): """ Prints the iotrace ring buffers for all CPUs by default. Arguments: -B : Print backtraces for each ring entry -C [,...,] : Limit trace entries to those generated by the specified CPUs (each cpuSpec can be a single CPU number or a range separated by a dash (e.g. "0-3")) -N : Limit output to the first entries (across all chosen CPUs) -R : Display results in reverse-sorted order (oldest first; default is newest-first) -S : Sort output by specified iotrace_entry_t field name (instead of by timestamp) """ MAX_IOTRACE_BACKTRACES = 16 if not hasattr(kern.globals, 'iotrace_entries_per_cpu'): print("Sorry, iotrace is not supported.") return if kern.globals.iotrace_entries_per_cpu == 0: print("Sorry, iotrace is disabled.") return hdrString = lambda : "%-19s %-8s %-10s %-20s SZ %-18s %-17s DATA" % ( "START TIME", "DURATION", "CPU#[RIDX]", " TYPE", " VIRT ADDR", " PHYS ADDR") entryString = lambda x : "%-20u(%6u) %6s[%02d] %-20s %-2d 0x%016x 0x%016x 0x%x" % ( x[IDX_RINGENTRY].start_time_abs, x[IDX_RINGENTRY].duration, "CPU%d" % x[IDX_CPU], x[IDX_RINGPOS], str(x[IDX_RINGENTRY].iotype).split("=")[1].strip(), x[IDX_RINGENTRY].size, x[IDX_RINGENTRY].vaddr, x[IDX_RINGENTRY].paddr, x[IDX_RINGENTRY].val) Trace_cmd(cmd_args, cmd_options, hdrString, entryString, 'iotrace_ring', kern.globals.iotrace_entries_per_cpu, MAX_IOTRACE_BACKTRACES) @lldb_command('ttrace', 'C:N:S:RB') def TrapTrace_cmd(cmd_args=[], cmd_options={}): """ Prints the iotrace ring buffers for all CPUs by default. Arguments: -B : Print backtraces for each ring entry -C [,...,] : Limit trace entries to those generated by the specified CPUs (each cpuSpec can be a single CPU number or a range separated by a dash (e.g. "0-3")) -N : Limit output to the first entries (across all chosen CPUs) -R : Display results in reverse-sorted order (oldest first; default is newest-first) -S : Sort output by specified traptrace_entry_t field name (instead of by timestamp) """ MAX_TRAPTRACE_BACKTRACES = 8 if kern.arch != "x86_64": print("Sorry, ttrace is an x86-only command.") return hdrString = lambda : "%-30s CPU#[RIDX] VECT INTERRUPTED_THREAD PREMLV INTRLV INTERRUPTED_PC" % ( "START TIME (DURATION [ns])") entryString = lambda x : "%-20u(%6s) %8s[%02d] 0x%02x 0x%016x %6d %6d %s" % ( x[IDX_RINGENTRY].start_time_abs, str(x[IDX_RINGENTRY].duration) if hex(x[IDX_RINGENTRY].duration) != "0xffffffffffffffff" else 'inprog', "CPU%d" % x[IDX_CPU], x[IDX_RINGPOS], int(x[IDX_RINGENTRY].vector), x[IDX_RINGENTRY].curthread, x[IDX_RINGENTRY].curpl, x[IDX_RINGENTRY].curil, GetSourceInformationForAddress(x[IDX_RINGENTRY].interrupted_pc)) Trace_cmd(cmd_args, cmd_options, hdrString, entryString, 'traptrace_ring', kern.globals.traptrace_entries_per_cpu, MAX_TRAPTRACE_BACKTRACES) # Yields an iterator over all the sysctls from the provided root. # Can optionally filter by the given prefix def IterateSysctls(root_oid, prefix="", depth = 0, parent = ""): headp = root_oid for pp in IterateListEntry(headp, 'oid_link', 's'): node_str = "" if prefix != "": node_str = str(pp.oid_name) if parent != "": node_str = parent + "." + node_str if node_str.startswith(prefix): yield pp, depth, parent else: yield pp, depth, parent type = pp.oid_kind & 0xf if type == 1 and pp.oid_arg1 != 0: if node_str == "": next_parent = str(pp.oid_name) if parent != "": next_parent = parent + "." + next_parent else: next_parent = node_str # Only recurse if the next parent starts with our allowed prefix. # Note that it's OK if the parent string is too short (because the prefix might be for a deeper node). prefix_len = min(len(prefix), len(next_parent)) if next_parent[:prefix_len] == prefix[:prefix_len]: for x in IterateSysctls(Cast(pp.oid_arg1, "struct sysctl_oid_list *"), prefix, depth + 1, next_parent): yield x @lldb_command('showsysctls', 'P:') def ShowSysctls(cmd_args=[], cmd_options={}): """ Walks the list of sysctl data structures, printing out each during traversal. Arguments: -P : Limit output to sysctls starting with the specified prefix. """ if '-P' in cmd_options: _ShowSysctl_prefix = cmd_options['-P'] allowed_prefixes = _ShowSysctl_prefix.split('.') if allowed_prefixes: for x in range(1, len(allowed_prefixes)): allowed_prefixes[x] = allowed_prefixes[x - 1] + "." + allowed_prefixes[x] else: _ShowSysctl_prefix = '' allowed_prefixes = [] for sysctl, depth, parentstr in IterateSysctls(kern.globals.sysctl__children, _ShowSysctl_prefix): if parentstr == "": parentstr = "" headp = sysctl st = (" " * depth * 2) + str(sysctl.GetSBValue().Dereference()).replace("\n", "\n" + (" " * depth * 2)) print('parent = "%s"' % parentstr, st[st.find("{"):]) @lldb_command('showexperiments', 'F') def ShowExperiments(cmd_args=[], cmd_options={}): """ Shows any active kernel experiments being run on the device via trial. Arguments: -F: Scan for changed experiment values even if no trial identifiers have been set. """ treatment_id = str(kern.globals.trial_treatment_id) experiment_id = str(kern.globals.trial_experiment_id) deployment_id = kern.globals.trial_deployment_id._GetValueAsSigned() if treatment_id == "" and experiment_id == "" and deployment_id == -1: print("Device is not enrolled in any kernel experiments.") if not '-F' in cmd_options: return else: print("""Device is enrolled in a kernel experiment: treatment_id: %s experiment_id: %s deployment_id: %d""" % (treatment_id, experiment_id, deployment_id)) print("Scanning sysctl tree for modified factors...") kExperimentFactorFlag = 0x00100000 formats = { "IU": gettype("unsigned int *"), "I": gettype("int *"), "LU": gettype("unsigned long *"), "L": gettype("long *"), "QU": gettype("uint64_t *"), "Q": gettype("int64_t *") } for sysctl, depth, parentstr in IterateSysctls(kern.globals.sysctl__children): if sysctl.oid_kind & kExperimentFactorFlag: spec = cast(sysctl.oid_arg1, "struct experiment_spec *") # Skip if arg2 isn't set to 1 (indicates an experiment factor created without an experiment_spec). if sysctl.oid_arg2 == 1: if spec.modified == 1: fmt = str(sysctl.oid_fmt) ptr = spec.ptr t = formats.get(fmt, None) if t: value = cast(ptr, t) else: # Unknown type continue name = str(parentstr) + "." + str(sysctl.oid_name) print("%s = %d (Default value is %d)" % (name, dereference(value), spec.original_value)) from memory import * from process import * from ipc import * from pmap import * from ioreg import * from mbufs import * from net import * from skywalk import * from kext import * from kdp import * from userspace import * from pci import * from misc import * from apic import * from scheduler import * from structanalyze import * from ipcimportancedetail import * from bank import * from turnstile import * from kasan import * from kauth import * from waitq import * from usertaskgdbserver import * from ktrace import * from xnutriage import * from kmtriage import * from kevent import * from workqueue import * from ulock import * from ntstat import * from zonetriage import * from sysreg import * from counter import * from refgrp import * from workload import * from recount import * from log import showLogStream, show_log_stream_info from nvram import * from exclaves import *