from collections import namedtuple from itertools import chain from xnu import * LogBlock = namedtuple('LogBlock', ['blk_id', 'blk']) LQEntry = namedtuple('LQEntry', ['state', 'size', 'lmid', 'data']) LogStreamElem = namedtuple( 'LogStreamElem', ['blk_id', 'offset', 'lsm', 'ftid']) class PPrinter(object): """ Naive yet helpful pretty printer. """ def __init__(self, O, kv_indent=17): self._O = O self._kv_fmt = "{:<" + str(kv_indent) + "}" def header(self, title, upper_case=False): """ Prints out a header with a given text. """ if upper_case: title = title.upper() print(self._O.format("{VT.Bold}{:s}{VT.EndBold}", title)) def table(self, args): """ Switches to a table mode. """ return self._O.table(args) @staticmethod def print(*values): """ A simple wrapper around `print` providing same functionality. """ print(*values) def kvprint(self, key, fmt, *values): """ Pretty prints aligned `key: values`. The `fmt` argument specifies value(s) format string. """ print(self._kv_fmt.format(key), end=": ") print(fmt.format(*values)) @staticmethod def empty_line(): """ Prints an empty line. """ print(" \n") class FTID(object): """ Represents a firehose tracepoint identifier. Decodes the tracepoint and provides details on its fields. """ TYPE = { 0x00: "default", 0x01: "info", 0x02: "debug", 0x10: "error", 0x11: "fault" } def __init__(self, ftid): self._ftid = ftid @property def value(self): """ Returns a firehose tracepoint id compound value. """ return self._ftid.ftid_value @property def namespace(self): """ Returns a tuple of a namespace identifier and its name. """ ns_id = int(self._ftid.ftid._namespace) ns_name = GetEnumName('firehose_tracepoint_namespace_t', ns_id, 'firehose_tracepoint_namespace') return (ns_id, ns_name.split('_')[-1]) @property def code(self): """ Returns a tracepoint code value. """ return int(self._ftid.ftid._code) @property def flags(self): """ Returns a tuple of a tracepoint flag and its name. """ flag_id = unsigned(self._ftid.ftid._flags) flag_name = GetEnumName('firehose_tracepoint_flags_t', flag_id, '_firehose_tracepoint_flags_') parts = flag_name.split('_') flag_name = parts[0] + ":" + " ".join(parts[2:]) return (flag_id, flag_name) @property def type(self): """ Returns a tuple of a tracepoint type and its name. """ # GetEnumName cannot be used because _firehose_tracepoint_type_trace_t # is defined as OS_ENUM which defines values as anonymous enum # unrelated to the type name whatsoever. code_id = int(self._ftid.ftid._type) return (code_id, self.TYPE.get(code_id, "")) class LogBlocks(object): """ A base class for objects using log blocks. """ BLOCK_SIZE = 64 def __init__(self, buf, blk, blk_count, is_ringbuffer): self._buf = buf self._blk = blk self._blk_count = blk_count self._is_ringbuffer = is_ringbuffer @property def block_count(self): """ Returns a total block count. """ return self._blk_count def block_size(self, block_count=1): """ Returns a space size occupied by a given number of blocks. """ return self.BLOCK_SIZE * block_count def block_position(self, cursor): """ Returns a block number and a buffer offset of a log at a provided position (block sequence number). """ blk_id = cursor % self.block_count if self.block_count > 0 else 0 return (blk_id, self.block_size(blk_id)) def blocks(self, head_block=0): """ Returns a generator of log blocks starting at a provided block position. """ blocks = self._blocks_range(head_block, self.block_count) if self._is_ringbuffer: blocks = chain(blocks, self._blocks_range(0, head_block)) return blocks def logs(self, head_block=0): """ Returns a generator of `LogStreamMsg` elements starting at a given block position. """ return (self._log_stream_msg(b.blk_id) for b in self.blocks(head_block) if b.blk) def _blocks_range(self, s, e): return (LogBlock(blk_id=i, blk=self._blk[i]) for i in range(s, e)) def _log_stream_msg(self, blk_id): lsm_offset = self.block_size(blk_id) lsm = Cast(addressof(self._buf[lsm_offset]), 'log_stream_msg_t *') return LogStreamElem(blk_id=blk_id, offset=lsm_offset, lsm=lsm, ftid=FTID(addressof(lsm.lsm_ft.ft_id))) class LogStream(LogBlocks): """ Represents an OSLog Stream buffer. """ def __init__(self, log_stream): super().__init__(log_stream.ls_buf, log_stream.ls_blk, log_stream.ls_blk_count, True) self._stream = log_stream @property def address(self): """ Returns a log stream address. """ return int(self._stream) @property def enabled(self): """ Returns true for enabled log stream, false otherwise. """ return self._stream.ls_enabled @property def reserved(self): """ Returns a total number of reserved bytes. """ return self._stream.ls_reserved @property def logged(self): """ Returns a number of commited bytes and an actual number of logged bytes. """ return (self._stream.ls_commited.v, self._stream.ls_commited.v - self._stream.ls_commited_wraps) def sync_cursor(self, cursor): """ Aligns read cursor so it points to the actual beginning of a log stream. """ commited, logged = self.logged read = cursor if read + self.block_count < logged: read = logged if commited >= self.block_count: read -= self.block_count return read % self.block_count class LogStreamCache(LogBlocks): """ Represents an OSLog Stream Cache. """ def __init__(self, log_cache): super().__init__(log_cache.lc_buf, log_cache.lc_blk, log_cache.lc_blk_count, False) self._cache = log_cache @property def address(self): """ Returns a log stream cache address. """ return addressof(self._cache) @property def stream(self): """ Returns a log stream cached by this instance. """ return LogStream(self._cache.lc_stream) if self._cache.lc_stream else None @property def stream_cursor(self): """ Returns a current log stream read cursor. The value is a block sequence of a block to read from next. """ if not self.stream: return 0 return self.stream.sync_cursor(self._cache.lc_stream_pos) @property def cache_cursor(self): """ Returns a current log cache cursor. The value is a number of a block to read from next. """ return self._cache.lc_blk_pos class MsgBuffer(object): """ Provides access to msgbuf_t ring buffer which is primarily a data type behind system message buffer accessible via dmesg CLI command. """ MSG_MAGIC = 0x063061 def __init__(self, msgbuf): self._msgbuf = msgbuf size = int(self._msgbuf.msg_size) self._buffer = self._msgbuf.msg_bufc.GetSBValue().GetPointeeData(0, size) def __eq__(self, other): return unsigned(self._msgbuf) == unsigned(other._msgbuf) def __hash__(self): return hash(unsigned(self._msgbuf)) @dyn_cached_property def values(self, target=None): """ Returns a list of all log messages. """ return list(self._values()) def __len__(self): """ Returns a number of log messages in the message buffer. """ return len(self.values) def __getitem__(self, i): """ Returns a log message at a given index. """ return self.values[i] def __iter__(self): """ Returns an iterator object of log messages. """ return iter(self.values) def _values(self): """ Returns a generator object of log messages. """ chars = (c for c in (self._at(i) for i in self._range()) if c != 0) line = bytearray() for c in chars: line.append(c) if chr(c) == '\n': yield line.decode(errors='backslashreplace') line = bytearray() yield line.decode(errors='backslashreplace') def _at(self, i): """ Returns a character at a given index. """ err = lldb.SBError() c = self._buffer.GetUnsignedInt8(err, i) if not err.Success(): raise ValueError( "Failed to read character at offset " + str(i) + ": " + err.GetCString()) return c def _range(self): """ Returns character indices starting at a ring buffer head. """ start = int(self._msgbuf.msg_bufx) size = int(self._msgbuf.msg_size) return ((start + i) % size for i in range(0, size)) class LogQueue(object): """ Represents a kernel log queue. """ def __init__(self, cpu, lq): self._cpu = cpu self._lq = lq @property def cpu(self): """ Returns a CPU number the log queue is associated with. """ return self._cpu @property def ptr(self): """ Returns a pointer to enclosed log_queue_s. """ return self._lq @property def address(self): """ Returns a log queue address. """ return int(self.ptr) @property def size(self): """ Returns an actual total log queue size. """ slot_size = 1 << unsigned(self._lq.lq_mem_size_order) return self._lq.lq_cnt_mem_active * slot_size @property def enqueued(self): """ Returns a generator of logs enqueued in the log queue. """ return self._lq_entries(addressof(self._lq.lq_log_list)) @property def dispatched(self): """ Returns a generator of logs dispatched by the log queue. """ return self._lq_entries(addressof(self._lq.lq_dispatch_list)) @staticmethod def _lq_entries(lqe_list): """ Returns a generator of LQEntry elements representing given single-linked list. """ def lq_entry(lqe): lqe_state = GetEnumName('log_queue_entry_state_t', lqe.lqe_state, 'LOG_QUEUE_ENTRY_STATE_') return LQEntry(state=lqe_state, size=lqe.lqe_size, lmid=lqe.lqe_lm_id, data=lqe.lqe_payload) return (lq_entry(e) for e in IterateTAILQ_HEAD(lqe_list, 'lqe_link', 's') if e) @staticmethod def collect(): """ Returns a list of all per-CPU log queues. """ pcpu_lqs = addressof(kern.GetGlobalVariable('percpu_slot_oslog_queue')) pcpu_lqs_type = pcpu_lqs.GetSBValue().GetType().name cpus = range(0, kern.globals.zpercpu_early_count) def PCPUSlot(i): addr = unsigned(pcpu_lqs) + kern.PERCPU_BASE(i) return kern.GetValueFromAddress(addr, pcpu_lqs_type) return [LogQueue(cpu, PCPUSlot(cpu)) for cpu in cpus] def show_log_stream_logs(pp, logs): """ Pretty prints logs metadata. """ hdr = "{0:>5s} {1:>7s} {2:>6s} {3:>12s} {4:>16s} {5:>8s} {6:>9s} {7:>7s} {8:>8s} {9:>18s}" hdr_labels = ( "BLOCK", "OFFSET", "LENGTH", "TIMESTAMP", "FTID", "TID", "NAMESPACE", "TYPE", "CODE", "FLAGS" ) log_hdr = ( "{log.blk_id:>5d} {log.offset:>7d} {ft.ft_length:>6d} {ts:>12d} " "{ftid.value:>16x} {ft.ft_thread:>8d} {ftid.namespace[1]:>9s} " "{ftid.type[1]:>7s} {ftid.code:>08x} {ftid.flags[1]:>18s}" ) log_count = 0 with pp.table(hdr.format(*hdr_labels)): for log in logs: pp.print(log_hdr.format(log=log, ts=log.lsm.lsm_ts, ft=log.lsm.lsm_ft, ftid=log.ftid)) log_count += 1 pp.print("(Shown {:d} logs)".format(log_count)) def show_log_stream_cache(pp, lsc): """ Pretty prints configuration details of a given log cache. """ pp.header("Stream Cache") pp.kvprint("Address", "{:#0x}", lsc.address) pp.kvprint("Size", "{:>d} bytes ({:d} blocks)", lsc.block_size(lsc.block_count), lsc.block_count) pp.kvprint("Next Read Block", "{:d} (Offset: {:d})", *lsc.block_position(lsc.cache_cursor)) def show_log_stream(pp, ls, read_pos): """ Pretty prints configuration details of a given log stream. """ pp.header("Stream") if not ls: pp.print("No log stream configured.") return commited, logged_bytes = ls.logged pp.kvprint("Address", "{:#0x}", ls.address) pp.kvprint("State", "{:s}", "Enabled" if ls.enabled else "Disabled") pp.kvprint("Size", "{:>d} bytes ({:d} blocks)", ls.block_size(ls.block_count), ls.block_count) pp.kvprint("Reserved", "{:d}", ls.reserved) pp.kvprint("Commited", "{:d}", commited) block, offset = ls.block_position(commited) pp.kvprint("Written", "{:d} bytes", logged_bytes) pp.kvprint("Next Write Block", "{:d} (Offset: {:d})", block, offset) block, offset = ls.block_position(read_pos) pp.kvprint("Read", "{:d} bytes", read_pos) pp.kvprint("Next Read Block", "{:d} (Offset: {:d})", block, offset) def show_log_stream_stats(pp): """ Pretty prints values of log stream counters. """ pp.header("Statistics") pp.kvprint("Total Count", "{:s}", GetSimpleCounter(kern.globals.oslog_s_total_msgcount)) pp.kvprint("Metadata Count", "{:s}", GetSimpleCounter(kern.globals.oslog_s_metadata_msgcount)) pp.kvprint("Streamed Logs", "{:s}", GetSimpleCounter(kern.globals.oslog_s_streamed_msgcount)) pp.kvprint("Dropped Logs", "{:s}", GetSimpleCounter(kern.globals.oslog_s_dropped_msgcount)) pp.kvprint("Invalid Logs", "{:s}", GetSimpleCounter(kern.globals.oslog_s_error_count)) def show_log_stream_info(show_meta, O=None): """ Pretty prints configuration details of OSLog Streaming and its current content. """ pp = PPrinter(O) log_cache = LogStreamCache(kern.globals.log_stream_cache) pp.empty_line() pp.header("Overview", True) show_log_stream_stats(pp) show_log_stream_cache(pp, log_cache) show_log_stream(pp, log_cache.stream, log_cache.stream_cursor) if not show_meta: return pp.empty_line() pp.header("Cached Stream Logs", True) show_log_stream_logs(pp, log_cache.logs(0)) if log_cache.stream: pp.empty_line() pp.header("Stream Logs", True) show_log_stream_logs( pp, log_cache.stream.logs(log_cache.stream_cursor)) def show_log_queues(pp, log_queues, show_logs): """ Pretty prints log queue summary and logs. """ hdr = "{:>3s} {:>16s} {:>2s} {:>10s} {:>10s} {:>5s} {:>8s} {:>6s} {:>6s} {:>8s}" hdr_labels = ( "CPU", "ADDRESS", "ON", "STATE", "RECONF", "SLOTS", "SIZE", "AVAIL", "STORED", "EN_ROUTE" ) hdr_lq = ( "{cpu:>3s} {lq.address:>16x} {lq.ptr.lq_ready:>2d} {state:>10s} {reconf:>10s} " "{lq.ptr.lq_cnt_mem_active:>5d} {size:>8d} {lq.ptr.lq_cnt_mem_avail:>6d} " "{enq:>6d} {disp:>8d}" ) def show_log_queue(lq, enqueued, dispatched): state = GetEnumName( 'lq_mem_state_t', lq._lq.lq_mem_state, 'LQ_MEM_STATE_') reconf = GetEnumName( 'lq_req_state_t', lq._lq.lq_req_state, 'LQ_REQ_STATE_') cpu = "N/A" if lq.cpu is None else str(lq.cpu) pp.print(hdr_lq.format(lq=lq, state=state, size=lq.size, reconf=reconf, cpu=cpu, enq=len(enqueued), disp=len(dispatched))) if show_logs: for lq in log_queues: lq_enqueued = list(lq.enqueued) lq_dispatched = list(lq.dispatched) pp.empty_line() with pp.table(hdr.format(*hdr_labels)): show_log_queue(lq, lq_enqueued, lq_dispatched) pp.empty_line() pp.header("Enqueues Log Messages") show_log_queue_logs(pp, lq_enqueued) pp.empty_line() pp.header("Dispatched Log Messages") show_log_queue_logs(pp, lq_dispatched) pp.empty_line() return with pp.table(hdr.format(*hdr_labels)): for lq in log_queues: show_log_queue(lq, list(lq.enqueued), list(lq.dispatched)) def show_log_queue_logs(pp, lqe_list): """ Pretty prints log queue logs. """ if not lqe_list: pp.print("No log messages present") return hdr = ("{:>5s} {:>7s} {:>10s} " "{:>12s} {:>8s} " "{:>16s} {:>9s} {:>7s} {:>8s} {:>18s}") hdr_labels = ( "LM_ID", "LM_SIZE", "STATE", "TIMESTAMP", "LOG_SIZE", "FTID", "NAMESPACE", "TYPE", "CODE", "FLAGS" ) log_hdr = ( "{lqe.lmid:>5x} {lqe.size:>7d} {lqe.state:>10s} " "{d.lp_timestamp:>12d} {d.lp_data_size:>8d} " "{ftid.value:>16x} {ftid.namespace[1]:>9s} " "{ftid.type[1]:>7s} {ftid.code:>08x} {ftid.flags[1]:>18s}" ) with pp.table(hdr.format(*hdr_labels)): for lqe in lqe_list: pp.print(log_hdr.format(lqe=lqe, d=lqe.data, ftid=FTID(lqe.data.lp_ftid))) pp.print("(Shown {:d} logs)".format(len(lqe_list))) @lldb_command('showlogstream', 'L', fancy=True) def showLogStream(cmd_args=None, cmd_options=None, O=None): """ Displays the contents of the log stream and the log stream cache. showlogstream [-L] Options: -L: Show metadata of logs stored in the log stream and the cache """ show_meta = "-L" in cmd_options show_log_stream_info(show_meta, O) @lldb_command('showlq', 'LC:A:', fancy=True) def showLogQueue(cmd_args=None, cmd_options=None, O=None): """ Displays the contents of the log queue. usage: showlq [-C cpu_num][-A addr][-L] -C n Shows a log queue summary of a selected CPU. -A a Shows a log queue summary at a given address. -L Show metadata of enqueued and dispatched logs Options -A and -C are mutually exclusive. If no options provided summaries of all per-CPU log queues are shown. Examples: showlq showlq -C 2 -L showlq -A 0xfffffff123456789 -L """ log_queues = LogQueue.collect() if "-C" in cmd_options and "-A" in cmd_options: raise ArgumentError("Options -A and -C are mutually exclusive.") if "-C" in cmd_options: cpu_no = ArgumentStringToInt(cmd_options['-C']) if cpu_no not in range(0, len(log_queues)): print("CPU number out of [0, {:d}) range".format(len(log_queues))) return log_queues = [lq for lq in log_queues if lq.cpu == cpu_no] elif "-A" in cmd_options: addr = ArgumentStringToInt(cmd_options['-A']) log_queues = [lq for lq in log_queues if addr == lq.address] if not log_queues: lq = kern.GetValueFromAddress(addr, 'log_queue_t') log_queues = [LogQueue(None, lq)] show_logs = "-L" in cmd_options show_log_queues(PPrinter(O), log_queues, show_logs) @lldb_command('showmsgbuf', 'C:F') def showMsgBuf(cmd_args=None, cmd_options=None): """ Displays the contents of msgbuf_t type at a given address. usage: showmsgbuf [-C ][-F] addr -C Shows first or last (if negative) specified number of logs. -F Show the content even if the magic key indicates data corruption. """ if not cmd_args: print(showMsgBuf.__doc__) return addr = kern.GetValueFromAddress(cmd_args[0], 'struct msgbuf *') if addr.msg_magic != MsgBuffer.MSG_MAGIC: print("Error: Invalid msgbuf_t magic key {:#x} (expected {:#x}). " "Invalid address or the content is corrupted.".format(addr.msg_magic, MsgBuffer.MSG_MAGIC)) if not "-F" in cmd_options: return msgbuf = MsgBuffer(addr) if "-C" in cmd_options: count = ArgumentStringToInt(cmd_options['-C']) n = min(abs(count), len(msgbuf)) msgbuf = msgbuf[:n] if count > 0 else msgbuf[-n:] for msg in msgbuf: print(msg.rstrip("\n")) @lldb_command('systemlog', 'C:F') def systemLog(cmd_args=None, cmd_options=None): """ Displays the contents of the system message buffer. usage: systemlog [-C ][-F] -C Shows first or last (if negative) specified number of logs. -F Show the content even if the magic key indicates data corruption. """ showMsgBuf([unsigned(kern.globals.msgbufp)], cmd_options)