from collections import namedtuple from functools import cached_property import os import io from typing import Any, Generator import core from uuid import UUID from core.cvalue import ( unsigned, signed, addressof ) from core.caching import ( cache_dynamically, LazyTarget, ) from core.io import SBProcessRawIO from macho import MachOSegment, MemMachO, VisualMachoMap from xnu import ( IterateLinkedList, lldb_alias, lldb_command, lldb_run_command, lldb_type_summary, kern, Cast, header, GetLongestMatchOption, debuglog, dsymForUUID, addDSYM, loadDSYM, ArgumentError, ArgumentStringToInt, GetObjectAtIndexFromArray, ResolveFSPath, uuid_regex, GetLLDBThreadForKernelThread ) import kmemory import macho import lldb import concurrent.futures # # Summary of information available about a kext. # # uuid - UUID of the object # vmaddr - VA of the text segment # name - Name of the kext # address - Kext address # segments - Mach-O segments (if available) # summary - OSKextLoadedSummary # kmod - kmod_info_t class KextSummary: def __init__(self, uuid: str, vmaddr, name: str, address: int, segments: list[MachOSegment], summary: core.value): self.uuid = uuid self.vmaddr = vmaddr self.name = name self.address = address self.segments = segments self.summary = summary @cached_property def kmod(self): try: kmod = GetKmodWithAddr(unsigned(self.address)) except ValueError: kmod = None return kmod # Segment helpers def text_segment(segments): """ Return TEXT segment if present in the list of first one. segments: List of MachOSegment. """ text_segments = { s.name: s for s in segments if s.name in ('__TEXT_EXEC', '__TEXT') } # Pick text segment based on our prefered order. for name in ['__TEXT_EXEC', '__TEXT']: if name in text_segments: return text_segments[name] return segments[0] def seg_contains(segments, addr): """ Returns generator of all segments that contains given address. """ return ( s for s in segments if s.vmaddr <= addr < (s.vmaddr + s.vmsize) ) def sec_contains(sections, addr): """ Returns generator of all sections that contains given address. """ return ( s for s in sections if s.addr <= addr < (s.addr + s.size) ) def sbsec_contains(target, sbsections, addr): """ Returns generator of all SBSections that contains given address. """ return ( s for s in sbsections if s.GetLoadAddress(target) <= addr < s.GetLoadAddress(target) + s.GetByteSize() ) # Summary helpers def LoadMachO(address, size): """ Parses Mach-O headers in given VA range. return: MemMachO instance. """ process = LazyTarget.GetProcess() procio = SBProcessRawIO(process, address, size) bufio = io.BufferedRandom(procio) return macho.MemMachO(bufio) def IterateKextSummaries(target) -> Generator[KextSummary, Any, None]: """ Generator walking over all kext summaries. """ hdr = target.chkFindFirstGlobalVariable('gLoadedKextSummaries').Dereference() arr = hdr.GetValueForExpressionPath('.summaries[0]') total = hdr.xGetIntegerByName('numSummaries') for kext in (core.value(e.AddressOf()) for e in arr.xIterSiblings(0, total)): # Load Mach-O segments/sections. mobj = LoadMachO(unsigned(kext.address), unsigned(kext.size)) # Construct kext summary. yield KextSummary( uuid=GetUUIDSummary(kext.uuid), vmaddr=text_segment(mobj.segments).vmaddr, name=str(kext.name), address=unsigned(kext.address), segments=mobj.segments, summary=kext ) @cache_dynamically def GetAllKextSummaries(target=None) -> list[KextSummary]: """ Return all kext summaries. (cached) """ return list(IterateKextSummaries(target)) def FindKextSummary(kmod_addr): """ Returns summary for given kmod_info_t. """ for mod in GetAllKextSummaries(): if mod.address == kmod_addr or mod.vmaddr == kmod_addr: return mod return None # Keep this around until DiskImages2 migrate over to new methods above. def GetKextLoadInformation(addr=0, show_progress=False): """ Original wrapper kept for backwards compatibility. """ if addr: return [FindKextSummary(addr)] else: return GetAllKextSummaries() @lldb_command('showkextmacho') def ShowKextMachO(cmd_args=[]): """ Show visual Mach-O layout. Syntax: (lldb) showkextmacho """ if len(cmd_args) != 1: raise ArgumentError("kext name is missing") for kext in GetAllKextSummaries(): # Skip not matching kexts. if kext.name.find(cmd_args[0]) == -1: continue # Load Mach-O segments/sections. mobj = LoadMachO(unsigned(kext.kmod.address), unsigned(kext.kmod.size)) p = VisualMachoMap(kext.name) p.printMachoMap(mobj) print(" \n") _UNKNOWN_UUID = "........-....-....-....-............" @lldb_type_summary(['uuid_t']) @header("") def GetUUIDSummary(uuid): """ returns a UUID string in form CA50DA4C-CA10-3246-B8DC-93542489AA26 uuid - Address of a memory where UUID is stored. """ err = lldb.SBError() addr = unsigned(addressof(uuid)) data = LazyTarget.GetProcess().ReadMemory(addr, 16, err) if not err.Success(): return _UNKNOWN_UUID return str(UUID(bytes=data)).upper() @lldb_type_summary(['kmod_info_t *']) @header(( "{0: <20s} {1: <20s} {2: <20s} {3: >3s} {4: >5s} {5: <20s} {6: <20s} " "{7: >20s} {8: <30s}" ).format('kmod_info', 'address', 'size', 'id', 'refs', 'TEXT exec', 'size', 'version', 'name')) def GetKextSummary(kmod): """ returns a string representation of kext information """ if not kmod: return "kmod info missing" format_string = ( "{mod: <#020x} {mod.address: <#020x} {mod.size: <#020x} " "{mod.id: >3d} {mod.reference_count: >5d} {seg.vmaddr: <#020x} " "{seg.vmsize: <#020x} {mod.version: >20s} {mod.name: <30s}" ) # Try to obtain text segment from kext summary summary = FindKextSummary(unsigned(kmod.address)) if summary: seg = text_segment(summary.segments) else: # Fake text segment for pseudo kexts. seg = MachOSegment('__TEXT', kmod.address, kmod.size, 0, kmod.size, []) return format_string.format(mod=kmod, seg=seg) def GetKmodWithAddr(addr): """ Go through kmod list and find one with begin_addr as addr. returns: None if not found else a cvalue of type kmod. """ for kmod in IterateLinkedList(kern.globals.kmod, 'next'): if addr == unsigned(kmod.address): return kmod return None @lldb_command('showkmodaddr') def ShowKmodAddr(cmd_args=[]): """ Given an address, print the offset and name for the kmod containing it Syntax: (lldb) showkmodaddr """ if len(cmd_args) < 1: raise ArgumentError("Insufficient arguments") addr = ArgumentStringToInt(cmd_args[0]) # Find first summary/segment pair that covers given address. sumseg = ( (m, next(seg_contains(m.segments, addr), None)) for m in GetAllKextSummaries() ) print(GetKextSummary.header) for ksum, segment in (t for t in sumseg if t[1] is not None): summary = GetKextSummary(ksum.kmod) print(summary + " segment: {} offset = {:#0x}".format( segment.name, (addr - segment.vmaddr))) return True def GetOSKextVersion(version_num): """ returns a string of format 1.2.3x from the version_num params: version_num - int return: str """ if version_num == -1: return "invalid" (MAJ_MULT, MIN_MULT) = (1000000000000, 100000000) (REV_MULT, STAGE_MULT) = (10000, 1000) version = version_num vers_major = version // MAJ_MULT version = version - (vers_major * MAJ_MULT) vers_minor = version // MIN_MULT version = version - (vers_minor * MIN_MULT) vers_revision = version // REV_MULT version = version - (vers_revision * REV_MULT) vers_stage = version // STAGE_MULT version = version - (vers_stage * STAGE_MULT) vers_stage_level = version out_str = "%d.%d" % (vers_major, vers_minor) if vers_revision > 0: out_str += ".%d" % vers_revision if vers_stage == 1: out_str += "d%d" % vers_stage_level if vers_stage == 3: out_str += "a%d" % vers_stage_level if vers_stage == 5: out_str += "b%d" % vers_stage_level if vers_stage == 6: out_str += "fc%d" % vers_stage_level return out_str def FindKmodNameForAddr(addr): """ Given an address, return the name of the kext containing that address. """ names = ( mod.kmod.name for mod in GetAllKextSummaries() if (any(seg_contains(mod.segments, unsigned(addr)))) ) return next(names, None) @lldb_command('showallkmods') def ShowAllKexts(cmd_args=None): """ Display a summary listing of all loaded kexts (alias: showallkmods) """ print("{: <36s} ".format("UUID") + GetKextSummary.header) for kmod in IterateLinkedList(kern.globals.kmod, 'next'): sum = FindKextSummary(unsigned(kmod.address)) if sum: _ksummary = GetKextSummary(sum.kmod) uuid = sum.uuid else: _ksummary = GetKextSummary(kmod) uuid = _UNKNOWN_UUID print(uuid + " " + _ksummary) @lldb_command('showallknownkmods') def ShowAllKnownKexts(cmd_args=None): """ Display a summary listing of all kexts known in the system. This is particularly useful to find if some kext was unloaded before this crash'ed state. """ kext_ptr = kern.globals.sKextsByID kext_count = unsigned(kext_ptr.count) print("%d kexts in sKextsByID:" % kext_count) print("{0: <20s} {1: <20s} {2: >5s} {3: >20s} {4: <30s}".format('OSKEXT *', 'load_addr', 'id', 'version', 'name')) format_string = "{0: <#020x} {1: <20s} {2: >5s} {3: >20s} {4: <30s}" for kext_dict in (GetObjectAtIndexFromArray(kext_ptr.dictionary, i) for i in range(kext_count)): kext_name = str(kext_dict.key.string) osk = Cast(kext_dict.value, 'OSKext *') load_addr = "------" id = "--" if int(osk.flags.loaded): load_addr = "{0: <#020x}".format(osk.kmod_info) id = "{0: >5d}".format(osk.loadTag) version_num = signed(osk.version) version = GetOSKextVersion(version_num) print(format_string.format(osk, load_addr, id, version, kext_name)) def FetchDSYM(kinfo): """ Obtains and adds dSYM based on kext summary. """ # No op for built-in modules. kernel_uuid = str(kern.globals.kernel_uuid_string) if kernel_uuid == kinfo.uuid: print("(built-in)") return # Obtain and load binary from dSYM. print("Fetching dSYM for %s" % kinfo.uuid) info = dsymForUUID(kinfo.uuid) if info and 'DBGSymbolRichExecutable' in info: print("Adding dSYM (%s) for %s" % (kinfo.uuid, info['DBGSymbolRichExecutable'])) addDSYM(kinfo.uuid, info) loadDSYM(kinfo.uuid, kinfo.vmaddr, kinfo.segments) else: print("Failed to get symbol info for %s" % kinfo.uuid) def AddKextSymsByFile(filename, slide): """ Add kext based on file name and slide. """ sections = None filespec = lldb.SBFileSpec(filename, False) print("target modules add \"{:s}\"".format(filename)) print(lldb_run_command("target modules add \"{:s}\"".format(filename))) loaded_module = LazyTarget.GetTarget().FindModule(filespec) if loaded_module.IsValid(): uuid_str = loaded_module.GetUUIDString() debuglog("added module {:s} with uuid {:s}".format(filename, uuid_str)) if slide is None: for k in GetAllKextSummaries(): debuglog(k.uuid) if k.uuid.lower() == uuid_str.lower(): slide = k.vmaddr sections = k.segments debuglog("found the slide {:#0x} for uuid {:s}".format(k.vmaddr, k.uuid)) if slide is None: raise ArgumentError("Unable to find load address for module described at {:s} ".format(filename)) if not sections: cmd_str = "target modules load --file \"{:s}\" --slide {:s}".format(filename, str(slide)) debuglog(cmd_str) else: cmd_str = "target modules load --file \"{:s}\"".format(filename) for s in sections: cmd_str += " {:s} {:#0x} ".format(s.name, s.vmaddr) debuglog(cmd_str) lldb.debugger.HandleCommand(cmd_str) kern.symbolicator = None return True def AddKextSymsByName(kextname, all=False): """ Add kext based on longest name match""" kexts = GetLongestMatchOption(kextname, [x.name for x in GetAllKextSummaries()], True) if not kexts: print("No matching kext found.") return False if len(kexts) != 1 and not all: print("Ambiguous match for name: {:s}".format(kextname)) if len(kexts) > 0: print("Options are:\n\t" + "\n\t".join(kexts)) return False # Load all matching dSYMs for sum in GetAllKextSummaries(): if sum.name in kexts: debuglog("matched the kext to name {:s} " "and uuid {:s}".format(sum.name, sum.uuid)) FetchDSYM(sum) kern.symbolicator = None return True def AddKextByAddress(addr: str): """ Given an address, load the kext which contains that address """ match = ( (kinfo, seg_contains(kinfo.segments, addr)) for kinfo in GetAllKextSummaries() if any(seg_contains(kinfo.segments, addr)) ) # Load all kexts which contain given address. print(GetKextSummary.header) for kinfo, segs in match: for s in segs: print(f"{GetKextSummary(kinfo.kmod)} segment: {s.name} offset = {(addr - s.vmaddr):0x}") FetchDSYM(kinfo) def AddKextByThread(addr: str): """ Given a thread, load all kexts needed to symbolicate its backtrace """ thread_value = kern.GetValueFromAddress(addr, "thread_t") thread_lldb_SBThread = GetLLDBThreadForKernelThread(thread_value) kexts_needed = dict() printed_header = False for frame in thread_lldb_SBThread.frames: if not frame.name: frame_addr = frame.GetPC() match = ( (kinfo, seg_contains(kinfo.segments, frame_addr)) for kinfo in GetAllKextSummaries() if any(seg_contains(kinfo.segments, frame_addr)) ) if match and not printed_header: print(GetKextSummary.header) printed_header = True for kinfo, segs in match: for s in segs: print(f"{GetKextSummary(kinfo.kmod)} segment: {s.name} offset = {(frame_addr - s.vmaddr):0x}") kexts_needed[kinfo.uuid] = kinfo print(f"Fetching {len(kexts_needed)} dSyms") pool = concurrent.futures.ThreadPoolExecutor() for kinfo in kexts_needed.values(): pool.submit(FetchDSYM, kinfo) pool.shutdown(wait=True) def AddKextByUUID(uuid: str): """ Loads the dSym for a specific UUID, or all dSym """ kernel_uuid = str(kern.globals.kernel_uuid_string).lower() load_all_kexts = (uuid == "all") if not load_all_kexts and len(uuid_regex.findall(uuid)) == 0: raise ArgumentError("Unknown argument {:s}".format(uuid)) pool = concurrent.futures.ThreadPoolExecutor() for sum in GetAllKextSummaries(): cur_uuid = sum.uuid.lower() if load_all_kexts or (uuid == cur_uuid): if kernel_uuid != cur_uuid: pool.submit(FetchDSYM, sum) pool.shutdown(wait=True) kern.symbolicator = None @lldb_command('addkext', 'AF:T:N:') def AddKextSyms(cmd_args=[], cmd_options={}): """ Add kext symbols into lldb. This command finds symbols for a uuid and load the required executable Usage: addkext : Load one kext based on uuid. eg. (lldb)addkext 4DD2344C0-4A81-3EAB-BDCF-FEAFED9EB73E addkext -F : Load kext with executable addkext -F : Load kext with executable at specified load address addkext -N : Load one kext that matches the name provided. eg. (lldb) addkext -N corecrypto addkext -N -A: Load all kext that matches the name provided. eg. to load all kext with Apple in name do (lldb) addkext -N Apple -A addkext -T : Given a thread, load all kexts needed to symbolicate its backtrace addkext all : Will load all the kext symbols - SLOW """ # Load kext by file name. if "-F" in cmd_options: exec_path = cmd_options["-F"] exec_full_path = ResolveFSPath(exec_path) if not os.path.exists(exec_full_path): raise ArgumentError("Unable to resolve {:s}".format(exec_path)) if not os.path.isfile(exec_full_path): raise ArgumentError( """Path is {:s} not a filepath. Please check that path points to executable. For ex. path/to/Symbols/IOUSBFamily.kext/Contents/PlugIns/AppleUSBHub.kext/Contents/MacOS/AppleUSBHub. Note: LLDB does not support adding kext based on directory paths like gdb used to.""".format(exec_path)) slide_value = None if cmd_args: slide_value = cmd_args[0] debuglog("loading slide value from user input {:s}".format(cmd_args[0])) return AddKextSymsByFile(exec_full_path, slide_value) # Load kext by name. if "-N" in cmd_options: kext_name = cmd_options["-N"] return AddKextSymsByName(kext_name, "-A" in cmd_options) # Load all kexts needed to symbolicate a thread's backtrace if "-T" in cmd_options: return AddKextByThread(cmd_options["-T"]) # Load kexts by UUID or "all" if len(cmd_args) < 1: raise ArgumentError("No arguments specified.") uuid = cmd_args[0].lower() return AddKextByUUID(uuid) @lldb_command('addkextaddr') def AddKextAddr(cmd_args=[]): """ Given an address, load the kext which contains that address Syntax: (lldb) addkextaddr """ if len(cmd_args) < 1: raise ArgumentError("Insufficient arguments") addr = ArgumentStringToInt(cmd_args[0]) AddKextByAddress(addr) class KextMemoryObject(kmemory.MemoryObject): """ Describes an object landing in some kext """ MO_KIND = "kext mach-o" def __init__(self, kmem, address, kinfo): super().__init__(kmem, address) self.kinfo = kinfo self.target = kmem.target @property def object_range(self): seg = next(seg_contains(self.kinfo.segments, self.address)) sec = next(sec_contains(seg.sections, self.address), None) if sec: return kmemory.MemoryRange(sec.addr, sec.addr + sec.size) return kmemory.MemoryRange(seg.vmaddr, seg.vmaddr + seg.vmsize) def find_mod_seg_sect(self): target = self.target address = self.address return next(( (module, segment, next(sbsec_contains(target, segment, address), None)) for module in target.module_iter() for segment in sbsec_contains(target, module.section_iter(), address) ), (None, None, None)) def describe(self, verbose=False): from lldb.utils.symbolication import Symbolicator addr = self.address kinfo = self.kinfo sbmod, sbseg, sbsec = self.find_mod_seg_sect() if sbmod is None: FetchDSYM(kinfo) print() sbmod, sbseg, sbsec = self.find_mod_seg_sect() syms = Symbolicator.InitWithSBTarget(self.target).symbolicate(addr) sym = next(iter(syms)) if syms else None if not sbseg: # not really an SBSection but we only need to pretty print 'name' # which both have, yay duck typing sbseg = next(seg_contains(kinfo.segments, addr), None) fmt = "Kext Symbol Info\n" fmt += " kext : {kinfo.name} ({kinfo.uuid})\n" fmt += " module : {sbmod.file.basename}\n" if sbmod else "" fmt += " section : {sbseg.name} {sbsec.name}\n" if sbsec else \ " segment : {sbseg.name}\n" if sbseg else "" fmt += " symbol : {sym!s}\n" if sym else "" print(fmt.format(kinfo=kinfo, sbmod=sbmod, sbseg=sbseg, sbsec=sbsec, sym=sym)) class MainBinaryMemoryObject(kmemory.MemoryObject): """ Describes an object landing in the main kernel binary """ MO_KIND = "kernel mach-o" def __init__(self, kmem, address, section): super().__init__(kmem, address) self.section = section self.target = kmem.target def _subsection(self): return next(sbsec_contains(self.target, self.section, self.address), None) @property def object_range(self): target = self.target section = self._subsection() or self.section addr = section.GetLoadAddress(target) size = section.GetByteSize() return kmemory.MemoryRange(addr, addr + size) @property def module(self): return self.target.GetModuleAtIndex(0).GetFileSpec().GetFilename() @property def uuid(self): return self.target.GetModuleAtIndex(0).GetUUIDString() def describe(self, verbose=False): from lldb.utils.symbolication import Symbolicator subsec = self._subsection() syms = Symbolicator.InitWithSBTarget(self.target).symbolicate(self.address) sym = next(iter(syms)) if syms else None fmt = "Symbol Info\n" fmt += " module : {mo.module}\n" fmt += " uuid : {mo.uuid}\n" fmt += " section : {mo.section.name} {subsec.name}\n" if subsec else "" fmt += " segment : {mo.section.name}\n" if not subsec else "" fmt += " symbol : {sym}\n" if sym else "" print(fmt.format(mo=self, subsec=subsec, sym=sym)) @kmemory.whatis_provider class KextWhatisProvider(kmemory.WhatisProvider): """ Kext ranges whatis provider """ COST = 100 def claims(self, address): target = self.target mainmod = target.GetModuleAtIndex(0) # # TODO: surely the kexts can provide a better range check # return any( sbsec_contains(target, mainmod.section_iter(), address) ) or any( any(seg_contains(kinfo.segments, address)) for kinfo in GetAllKextSummaries() ) def lookup(self, address): target = self.target mainmod = target.GetModuleAtIndex(0) section = next(sbsec_contains(target, mainmod.section_iter(), address), None) if section: return MainBinaryMemoryObject(self.kmem, address, section) return KextMemoryObject(self.kmem, address, next( kinfo for kinfo in GetAllKextSummaries() if any(seg_contains(kinfo.segments, address)) )) # Aliases for backward compatibility. lldb_alias('showkmod', 'showkmodaddr') lldb_alias('showkext', 'showkmodaddr') lldb_alias('showkextaddr', 'showkmodaddr') lldb_alias('showallkexts', 'showallkmods')