1import logging 2import target 3import struct 4 5from xnu import * 6from core.operating_system import Armv8_RegisterSet, Armv7_RegisterSet, I386_RegisterSet, X86_64RegisterSet 7from core import caching 8 9""" these defines should come from an authoritative header file """ 10CPU_TYPE_I386 = 0x00000007 11CPU_TYPE_X86_64 = 0x01000007 12CPU_TYPE_ARM = 0x0000000c 13CPU_TYPE_ARM64 = 0x0100000c 14CPU_TYPE_ARM64_32 = 0x0200000c 15 16def GetRegisterSetForCPU(cputype, subtype): 17 if cputype == CPU_TYPE_ARM64: 18 retval = Armv8_RegisterSet 19 elif cputype == CPU_TYPE_ARM64_32: 20 retval = Armv8_RegisterSet 21 elif cputype == CPU_TYPE_ARM: 22 retval = Armv7_RegisterSet 23 elif cputype == CPU_TYPE_I386: 24 retval = I386_RegisterSet 25 elif cputype == CPU_TYPE_X86_64: 26 retval = X86_64RegisterSet 27 28 """ crash if unknown cputype """ 29 30 return retval.register_info['registers'] 31 32 33class UserThreadObject(object): 34 """representation of userspace thread""" 35 def __init__(self, thr_obj, cputype, cpusubtype, is_kern_64bit): 36 super(UserThreadObject, self).__init__() 37 self.thread = thr_obj 38 self.registerset = GetRegisterSetForCPU(cputype, cpusubtype) 39 self.thread_id = unsigned(self.thread.thread_id) 40 self.is64Bit = bool(cputype & 0x01000000) 41 42 if self.is64Bit: 43 if cputype == CPU_TYPE_X86_64: 44 self.reg_type = "x86_64" 45 self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_64 46 if cputype == CPU_TYPE_ARM64: 47 self.reg_type = "arm64" 48 self.saved_state = self.thread.machine.upcb.uss.ss_64 49 else: 50 if cputype == CPU_TYPE_I386: 51 self.reg_type = "i386" 52 self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_32 53 if cputype == CPU_TYPE_ARM: 54 self.reg_type = "arm" 55 if not is_kern_64bit: 56 self.saved_state = self.thread.machine.PcbData 57 else: 58 self.saved_state = self.thread.machine.contextData.ss.uss.ss_32 59 if cputype == CPU_TYPE_ARM64_32: 60 self.reg_type = "arm64" 61 self.saved_state = self.thread.machine.upcb.uss.ss_64 62 63 logging.debug("created thread id 0x%x of type %s, is_kern_64bit 0x%x cputype 0x%x" 64 % (self.thread_id, self.reg_type, is_kern_64bit, cputype)) 65 66 def getRegisterValueByName(self, name): 67 if self.reg_type == 'arm64': 68 if name in ('x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9', 'x10', 'x11', 'x12', 'x13', 'x14', 'x15', 'x16', 'x17', 'x18', 'x19', 'x20', 'x21', 'x22', 'x23', 'x24', 'x25', 'x26', 'x27', 'x28'): 69 return unsigned(getattr(self.saved_state, 'x')[int(name.strip('x'))]) 70 71 return unsigned(getattr(self.saved_state, name)) 72 73 if self.reg_type == "x86_64": 74 if name in ('rip', 'rflags', 'cs', 'rsp', 'cpu'): 75 return unsigned(getattr(self.saved_state.isf, name)) 76 return unsigned(getattr(self.saved_state, name)) 77 78 if self.reg_type == "arm": 79 if name in ('r0', 'r1', 'r2', 'r3', 'r4', 'r5', 'r6', 'r7', 'r8', 'r9', 'r10', 'r11', 'r12'): 80 retval = unsigned(getattr(self.saved_state, 'r')[int(name.strip('r'))]) 81 else: 82 retval = unsigned(getattr(self.saved_state, name)) 83 return retval 84 85 #TODO for i386 86 87 def getName(self): 88 return str(self.thread_id) 89 90 def getRegisterData(self, reg_num): 91 """ returns None if there is error """ 92 if reg_num < 0 or reg_num >= len(self.registerset): 93 logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, self.thread_id)) 94 return None 95 return self.getRegisterValueByName(self.registerset[reg_num]['name']) 96 97 98class UserProcess(target.Process): 99 """ Represent a user process and thread states """ 100 def __init__(self, task): 101 self.task = task 102 self.proc = Cast(task.bsd_info, 'proc_t') 103 dataregisters64bit = False 104 ptrsize = 4 105 106 if task.t_flags & 0x1: 107 ptrsize = 8 108 if task.t_flags & 0x2: 109 dataregisters64bit = True 110 111 is_kern_64bit = kern.arch in ['x86_64', 'x86_64h', 'arm64', 'arm64e'] 112 113 self.cputype = unsigned(self.proc.p_cputype) 114 self.cpusubtype = unsigned(self.proc.p_cpusubtype) 115 116 super(UserProcess, self).__init__(self.cputype, self.cpusubtype, ptrsize) 117 dbg_message = "process:%s is64bit:%d ptrsize:%d cputype:0x%x cpusubtype:0x%x" % (hex(self.proc), int(dataregisters64bit), ptrsize, self.cputype, self.cpusubtype) 118 self.proc_platform = int(GetProcPlatform(self.proc)) 119 if self.proc_platform == xnudefines.P_PLATFORM_MACOS: 120 self.hinfo['ostype'] = 'macosx' 121 elif self.proc_platform == xnudefines.P_PLATFORM_WATCHOS: 122 self.hinfo['ostype'] = 'watchos' 123 elif self.proc_platform == xnudefines.P_PLATFORM_TVOS: 124 self.hinfo['ostype'] = 'tvos' 125 else: 126 self.hinfo['ostype'] = 'ios' 127 dbg_message += " ostype:%s" % self.hinfo['ostype'] 128 129 if is_kern_64bit and str(kern.arch).lower().startswith('arm'): 130 addressing_bits = 64 - int(kern.globals.gT1Sz) 131 self.hinfo['addressing_bits'] = addressing_bits 132 dbg_message += " addressing_bits:%d" % addressing_bits 133 134 self.registerset = GetRegisterSetForCPU(self.cputype, self.cpusubtype) 135 logging.info(dbg_message) 136 self.threads = {} 137 self.threads_ids_list = [] 138 logging.debug("iterating over threads in process") 139 for thval in IterateQueue(task.threads, 'thread *', 'task_threads'): 140 self.threads[unsigned(thval.thread_id)] = UserThreadObject(thval, self.cputype, self.cpusubtype, is_kern_64bit) 141 self.threads_ids_list.append(unsigned(thval.thread_id)) 142 143 def getRegisterDataForThread(self, th_id, reg_num): 144 if th_id not in self.threads: 145 logging.critical("0x%x thread id is not found in this task") 146 return '' 147 if reg_num < 0 or reg_num >= len(self.registerset): 148 logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, th_id)) 149 return '' 150 value = self.threads[th_id].getRegisterData(reg_num) 151 return self.encodeRegisterData(value, bytesize=self.registerset[reg_num]['bitsize']/8) 152 153 def getRegisterCombinedDataForThread(self, th_id): 154 if th_id not in self.threads: 155 logging.critical("0x%x thread id is not found in this task" % th_id) 156 return '' 157 cur_thread = self.threads[th_id] 158 retval = 'thread:%s;name:%s;' % (self.encodeThreadID(th_id), cur_thread.getName()) 159 pos = 0 160 for rinfo in self.registerset: 161 name = rinfo['name'] 162 format = "%02x:%s;" 163 value = cur_thread.getRegisterValueByName(name) 164 value_endian_correct_str = self.encodeRegisterData(value, bytesize=(rinfo['bitsize']/8)) 165 retval += format % (pos, value_endian_correct_str) 166 pos += 1 167 return retval 168 169 def getThreadStopInfo(self, th_id): 170 if th_id not in self.threads: 171 logging.critical("0x%x thread id is not found in this task") 172 return '' 173 return 'T02' + self.getRegisterCombinedDataForThread(th_id) + 'threads:' + self.getThreadsInfo()+';' 174 175 def getRegisterInfo(self, regnum): 176 #something similar to 177 #"name:x1;bitsize:64;offset:8;encoding:uint;format:hex;gcc:1;dwarf:1;set:General Purpose Registers;" 178 if regnum >= len(self.registerset): 179 logging.debug("No register_info for number %d." % regnum) 180 return 'E45' 181 182 rinfo = self.registerset[regnum] 183 retval = '' 184 for i in rinfo.keys(): 185 i_val = str(rinfo[i]) 186 if i == 'set': 187 i_val = 'General Purpose Registers' 188 retval += '%s:%s;' % (str(i), i_val) 189 190 return retval 191 192 def getProcessInfo(self): 193 retval = '' 194 #pid:d22c;parent-pid:d34d;real-uid:ecf;real-gid:b;effective-uid:ecf;effective-gid:b;cputype:1000007;cpusubtype:3; 195 #ostype:macosx;vendor:apple;endian:little;ptrsize:8; 196 pinfo = {'effective-uid': 'ecf', 'effective-gid': 'b', 'endian': 'little', 'vendor': 'apple'} 197 pinfo['pid'] = "%x" % (GetProcPIDForTask(self.task)) 198 pinfo['parent-pid'] = "%x" % (unsigned(self.proc.p_ppid)) 199 pinfo['ptrsize'] = str(self.ptrsize) 200 pinfo['ostype'] = 'macosx' 201 pinfo['cputype'] = "%x" % self.cputype 202 pinfo['cpusubtype'] = "%x" % self.cpusubtype 203 pinfo['real-uid'] = "%x" % (unsigned(self.proc.p_ruid)) 204 pinfo['real-gid'] = "%x" % (unsigned(self.proc.p_rgid)) 205 if str(kern.arch).find('arm') >= 0: 206 pinfo['ostype'] = 'ios' 207 for i in pinfo.keys(): 208 i_val = str(pinfo[i]) 209 retval += '%s:%s;' % (str(i), i_val) 210 return retval 211 212 def readMemory(self, address, size): 213 cache_key = "{}-{}-{}".format(hex(self.task), hex(address), size) 214 cache_data = caching.GetDynamicCacheData(cache_key) 215 if cache_data: 216 return self.encodeByteString(cache_data) 217 data = GetUserDataAsString(self.task, address, size) 218 if not data: 219 logging.error("Failed to read memory task:{: <#018x} {: <#018x} {:d}".format(self.task, address, size)) 220 else: 221 caching.SaveDynamicCacheData(cache_key, data) 222 return self.encodeByteString(data) 223 224 def getSharedLibInfoAddress(self): 225 return unsigned(self.task.all_image_info_addr) 226