手把手教你写Windows 64位平台调试器
2020-12-13 05:07
标签:des style blog http color 使用 本文网页排版有些差,已上传了doc,可以下载阅读。本文中的所有代码已打包,下载地址在此。 -------------------------------------------------------------------------------------------------------------------------------------------------------------- 手写一个调试器有助于我们理解hook、进程注入等底层黑客技术具体实现,在编写过程中需要涉及大量Windows内核编程知识,因此手写调试器也可以作为启发式学习内核编程的任务驱动。(本文中代码大量参考《Gray hat python》(Python灰帽子),此书中详细讲解了调试器的基本原理,因此本文假设读者已具备基本调试技能,能理解调试器大致原理以及断点原理,读者在不理解本文内容时可参考此书,不过《Gray hat python》中的代码是32位的,并且有不少错误,笔者花了一周的时间调试修复了书中的bug,并且将其修改成了64位版本,本文旨在总结该书中第三章的内容并提供一份64位版本的代码,该代码在本人64位Win7,i3-2310M CPU主机上能成功运行) 首先来看看一个调试器需要实现哪些基本需求: 完成这些需求都需要调用kernel32.dll里的函数,这些函数都是用C写成的,在MSDN里可以查到这些函数的官方文档, 因此用C语言调用它们会更方便,但是C语言的开发效率不高,我们选择用Python来编写调试器。 映射C语言数据类型 在调用kernel32.dll里的函数时需要传入C语言中的数据类型声明的变量和结构体,Python提供ctypes模块来与C语言对接,C语言里的数据类型都可以映射到Python里,因此先让我们从映射数据类型着手开始我们的调试器编写。 上图展示了C语言中各数据类型在ctypes中对应的类型,而调用kernel32.dll需要使用微软自己宏定义的数据类型,这些数据类型实际上就是C语言中的基本数据类型。 建立Python工程my_debugger,再创建一个包main_package,在main_package下新建文件my_debugger_defines.py,该文件主要用来储存数据类型和结构体的定义,输入以下代码: 初步构建调试器 新建文件my_debugger.py,我们用该文件实现调试器。输入以下代码: 我们调用LoadLibrary函数将kernel32.dll装载进来,然后就可以用kernel32来引用它了。 接下来我们要开始构造调试器,我们将调试器的功能封装进一个类,用h_process来保存调试器附加的进程的进程句柄,用pid来保存目标进程的pid,用debugger_active来作为调试器是否启动的标志,我们在__init__里声明这三个变量: 接下来考虑我们的第一个需求:以调试级状态启动目标进程,或附加在目标进程上,监听调试事件。 以调试级状态启动目标进程 进程运行分调试级状态和非调试级状态,当进程为调试级状态,触发调试事件或是抛出异常时,操作系统会将该进程挂起,并通知附加它的调试进程。 我们编写一个load函数来让我们的调试器以调试级状态启动目标进程,这样我们就能用debugger监听目标进程的调试事件了。kernel32.dll提供CreateProcessA函数来创建进程,有关该函数的信息请自行查阅MSDN。调用CreateProcessA需要用到两个关键参数: 在my_debugger_defines.py里添加DEBUG_PROCESS的声明: 继续映射两个结构体: kernel32提供OpenProcess来为指定的pid打开进程,返回句柄,我们写一个函数来封装它: 首先定义PROCESS_ALL_ACCESS: 接下来编写open_process: 接下来在my_debugger.py中编写load函数: 该代码调用CreateProcessA创建一个进程,并且成功后打印进程的pid,调用open_process打开该进程的句柄并保存。至于下面三行代码的意思请自行MSDN: 至此我们初步实现了需求1的第一个功能:以调试状态启动目标进程。你可以创建一个debugger,像这样来测试它debugger.load(‘C:\Windows\System32\calc.exe’)。 有时我们需要将调试器附加在已启动的进程上,因此我们还需要编写一个attach(pid)。 kernel32提供DebugActiveProcess来将本进程附加在指定pid的进程上: 值得注意的是,pid必须是整数,如果你用raw_input来输入pid的话,记得把它转换成整数。 我们现在只剩下最后一个功能(监听调试事件)就能完成需求1了。 监听调试事件 Kernel32.dll提供WaitForDebugEvent来监听调试事件,当目标进程发生调试事件时会通知我们的调试器进行处理,我们用一个循环不断调用此函数来在处理完一个调试事件后立即监听下一个调试事件。 只要调试器是启动的(self.debugger_active == True),则不断获取调试事件。 编写get_debug_event(),调用WaitForDebugEvent需要用DEBUG_EVENT结构体来保存调试事件信息,我们把它映射进来: 该结构体需要用到联合体_DEBUG_EVENT_UNION,我们也把它映射进来: 该联合体实际上包含许多成员,但我们的调试器只需要用到EXCEPTION_DEBUG_INFO一个就足够了。 接下来就可以开始编写get_debug_event了: 其中: 我们在获取到调试事件后直接调用ContinueDebugEvent来使挂起的目标继续执行。 笔者在WaitForDebugEvent监听到一个调试事件后曾将debug_event保存下来,结果导致程序出现了一个非常诡异的bug,调试了很久才发现WaitForDebugEvent并不会等debug_event的所有成员变量释放锁后才通知调试进程,因此千万不要对debug_event进行操作,否则会导致死锁发生。 现在我们的调试器已完成了需求1,你可以在监测到调试事件后添加输出一些信息的代码,attach到一个计算器上测试一下。 设置断点 作为一个调试器,最重要的功能就是打断点。首先来实现最简单的软断点。 软断点 软断点就是int3断点,当程序执行到int3指令时会触发一个异常中断下来,并查看是否有调试器附加在该程序上,如果有,则交给调试进程处理。因此打软断点就是将我们要中断的地址的字节修改为’\xCC’(int3指令),在断下来后将该字节改回去,让程序正常执行。 首先要实现的是读取内存和修改内存: 这样我们就能将要打断点的地址的内容修改成’\xCC’了。 我们用self.breakpoints来保存软断点,在__init__中添加: 然后编写bp_set来打软断点: 该函数先检查断点字典中是否有该地址,如果没有,则记录该地址首字节,并修改成’\xCC’,将其添加进self.breakpoints字典。 硬件断点 软断点最大的缺点是需要修改进程内存,这会破坏CRC,因此软断点是极其容易被反调试技术Anti的。硬件断点由于只修改寄存器,因此不容易被目标进程察觉。 硬件断点是用8个调试寄存器DR0-DR7实现的。其中DR0-DR3用来储存断点地址,DR4-DR5保留,DR6是调试状态寄存器,在进程触发硬件断点时返回触发的是哪一个断点给调试器,DR7是调试控制寄存器。 打硬件断点首先需要在DR0-DR3中找一个空闲的寄存器,将断点地址写进去,然后修改DR7相应标志位来设置断点长度和断点条件。 断点条件有三个:读、写、执行。分别表示在读该地址、写该地址、执行该地址的时候中断: 我们还需要知道断点的长度才能判断是否应该中断,断点长度也有三个选择:1字节、2字节、4字节。 如何修改寄存器呢?操作系统为每一个线程维护了一个结构体来保存上下文,当线程中断时,操作系统会将所有寄存器放进该结构体里保存起来,当线程恢复执行时将该结构体取出,恢复寄存器的值。因此我们可以通过修改这个结构体来实现修改寄存器的目的。 线程上下文结构体如下: 注意,在《Gay hat python》一书中所使用的线程上下文是32位的,如果你在64位平台下使用32位的结构体来保存线程上下文,将会得到一个寄存器值全为0的空的线程上下文。 接下来编写一个用来获取线程上下文的函数,根据MSDN,在调用kernel32.GetThreadContext前需要对结构体进行初始化: 虽然是64位,但是我用kernel32.GetThreadContext来获取线程上下文却并没任何问题,反倒是调用kernel32.Wow64GetThreadContext却会发生87号错误(参数不正确),我试了很久也没弄清楚为什么,如果有大神知道这是什么情况请联系我,谢谢! 通常硬件断点都是针对整个进程的,因此我们需要对目标进程中的所有线程逐一修改线程上下文,这就涉及到一个枚举线程的问题,kernel32仍然提供API帮助我们做这件事。每个进程都保存了一张线程快照表来保存所有线程的状态信息,有了这张表我们可以利用kernel32.Thread32First获取到第一个线程,先后调用kernel32.Thread32Next就能继续遍历线程了。 保存线程信息的结构体和获取线程快照表所需的常量参数如下所示: 枚举线程的函数如下: 注意,《Gay hat python》源码中此处有bug,以上代码已将其修复。 现在我们就可以开始编写打硬件断点的函数了。我们用self.hardware_breakpoints来保存硬件断点,然后枚举线程,逐一修改调试寄存器。 怎么修改调试寄存器呢?对于DR0-DR3,我们可以简单地找一个空闲的寄存器,将我们要打断点的地址写进去即可。对于DR7,需要仔细研究标志位构造。注意,我们是在64位平台下,因此寄存器是64位的,《Gay hat python》中修改DR7的代码是32位的,那么我们的代码是否应与此书不同呢?实际上,64位的DR6和DR7的高32位是用不到的,低32位构造与32位寄存器完全一致,因此此书上的代码在64位环境下兼容。 我们看看64位的DR6与DR7的构造:
DR7的0、2、4、6位代表DR0、DR1、DR2、DR3,置1表示此寄存器被打上了硬件断点。16、20、24、28位分别保存DR0、DR1、DR2、DR3的硬件断点的条件,18、22、26、30位分别保存DR0、DR1、DR2、DR3的硬件断点的长度。 举个例子,如果我们要打一个0x77284地址的内存长度为1,条件为执行的硬件断点,该怎么修改寄存器呢。首先在DR0-DR3中找一个空闲的寄存器(假设是DR2)赋值为0x77284,接着将DR7的第24位赋值为HW_EXCUTE,将26位赋值为0(长度减1)。 现在可以开始写硬件断点了:
1 from ctypes import *
2
3 BYTE = c_ubyte
4 WORD = c_ushort
5 DWORD = c_ulong
6 LPBYTE = POINTER(c_ubyte)
7 LPTSTR = POINTER(c_char)
8 HANDLE = c_void_p
9 PVOID = c_void_p
10 ULONG_PTR = POINTER(c_ulong)
11 LPVOID = c_void_p
12 UINT_PTR = c_ulong
13 SIZE_T = c_ulong
14 DWORD64 = c_uint64
1 from ctypes import *
2 from main_package.my_debugger_defines import *
3
4 kernel32 = windll.LoadLibrary("kernel32.dll")
1 class debugger():
2
3 def __init__(self):
4 self.h_process = None
5 self.pid = None
6 self.debugger_active = False
1 DEBUG_PROCESS = 0X00000001
1 class STARTUPINFO(Structure):
2 _fields_ = [
3 ("cb", DWORD),
4 ("lpReserved", LPTSTR),
5 ("lpDesktop", LPTSTR),
6 ("lpTitle", LPTSTR),
7 ("dwX", DWORD),
8 ("dwY", DWORD),
9 ("dwXSize", DWORD),
10 ("dwYSize", DWORD),
11 ("dwXCountChars", DWORD),
12 ("dwYCountChars", DWORD),
13 ("dwFillAttribute", DWORD),
14 ("dwFlags", DWORD),
15 ("wShowWindow", WORD),
16 ("cbReserved2", WORD),
17 ("lpReserved2", LPBYTE),
18 ("hStdInput", HANDLE),
19 ("hStdOutput", HANDLE),
20 ("hStdError", HANDLE)
21 ]
22
23 class PROCESS_INFORMATION(Structure):
24 _fields_ = [
25 ("hProcess", HANDLE),
26 ("hThread", HANDLE),
27 ("dwProcessId", DWORD),
28 ("dwThreadId", DWORD)
29 ]
1 PROCESS_ALL_ACCESS = 0X1F0FFF
1 1 #get process handle
2 2 def open_process(self,pid):
3 3 h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS,False,pid)
4 4 return h_process
1 def load(self, path_to_exe):
2
3 creation_flags = DEBUG_PROCESS
4
5 startupinfo = STARTUPINFO()
6 process_information = PROCESS_INFORMATION()
7
8 startupinfo.dwFlags = 0X1
9 startupinfo.wShowWindow = 0X0
10 startupinfo.cb = sizeof(startupinfo)
11
12 if kernel32.CreateProcessA(path_to_exe,
13 None,
14 None,
15 None,
16 None,
17 creation_flags,
18 None,
19 None,
20 byref(startupinfo),
21 byref(process_information)):
22 print "[*] We have successfully launched the process!!"
23 print "[*] PID: %d" % process_information.dwProcessId
24 self.h_process = self.open_process(process_information.dwProcessId) #keep a process handle
25 self.debugger_active = True
26 else:
27 print "[*] Error: 0x%08x." % kernel32.GetLastError()
1 startupinfo.dwFlags = 0X1
2 startupinfo.wShowWindow = 0X0
3 startupinfo.cb = sizeof(startupinfo)
1 def attach(self,pid):
2 self.h_process = self.open_process(pid)
3 if kernel32.DebugActiveProcess(pid):
4 self.debugger_active = True
5 self.pid = int(pid)
6 else:
7 print "[*] Unable to attach the process."
1 def run(self):
2 while self.debugger_active == True:
3 self.get_debug_event()
1 class DEBUG_EVENT(Structure):
2 _fields_ = [
3 ("dwDebugEventCode", DWORD),
4 ("dwProcessId", DWORD),
5 ("dwThreadId", DWORD),
6 ("u", _DEBUG_EVENT_UNION)
7 ]
1 class _DEBUG_EVENT_UNION(Union):
2 _fields_ = [
3 ("Exception", EXCEPTION_DEBUG_INFO),
4 ]
1 class EXCEPTION_DEBUG_INFO(Structure):
2 _fields_ = [
3 ("ExceptionRecord", EXCEPTION_RECORD),
4 ("dwFirstChance", DWORD)
5 ]
6 class EXCEPTION_RECORD(Structure):
7 Pass
8 EXCEPTION_RECORD._fields_ = [
9 ("ExceptionCode", DWORD),
10 ("ExceptionFlags", DWORD),
11 ("ExceptionRecord", POINTER(EXCEPTION_RECORD)),
12 ("ExceptionAddress", PVOID),
13 ("NumberParameters", DWORD),
14 ("ExceptionInformation", UINT_PTR * 15),
15 ]
1 def get_debug_event(self):
2
3 debug_event = DEBUG_EVENT()
4 continue_status = DBG_CONTINUE
5 if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE):
6 kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId, continue_status )
1 INFINITE = 0xFFFFFFFF
2 DBG_CONTINUE = 0X00010002
1 def read_process_memory(self,address,length):
2 data = ""
3 read_buf = create_string_buffer(length)
4 count = c_ulong(0)
5 if not kernel32.ReadProcessMemory(self.h_process,
6 address,
7 read_buf,
8 length,
9 byref(count)):
10 return False
11 else:
12 data += read_buf.raw
13 return data
14
15 def write_process_memory(self,address,data):
16 count = c_ulong(0)
17 length = len(data)
18 c_data = c_char_p(data[count.value:])
19 if not kernel32.WriteProcessMemory(self.h_process,
20 address,
21 c_data,
22 length,
23 byref(count)):
24 return False
25 else:
26 return True
1 self.breakpoints = {}
1 def bp_set(self,address):
2 print "[*] Setting breakpoint at: 0x%08x" % address
3 if not self.breakpoints.has_key(address):
4 try:
5 original_byte = self.read_process_memory(address, 1)
6 self.write_process_memory(address, ‘\xCC‘)
7 self.breakpoints[address] = original_byte
8 except:
9 return False
10 return True
1 HW_ACCESS = 0x00000003
2 HW_EXECUTE = 0x00000000
3 HW_WRITE = 0x00000001
class WOW64_CONTEXT(Structure):
_pack_ = 16
_fields_ = [
("P1Home", DWORD64),
("P2Home", DWORD64),
("P3Home", DWORD64),
("P4Home", DWORD64),
("P5Home", DWORD64),
("P6Home", DWORD64),
("ContextFlags", DWORD),
("MxCsr", DWORD),
("SegCs", WORD),
("SegDs", WORD),
("SegEs", WORD),
("SegFs", WORD),
("SegGs", WORD),
("SegSs", WORD),
("EFlags", DWORD),
("Dr0", DWORD64),
("Dr1", DWORD64),
("Dr2", DWORD64),
("Dr3", DWORD64),
("Dr6", DWORD64),
("Dr7", DWORD64),
("Rax", DWORD64),
("Rcx", DWORD64),
("Rdx", DWORD64),
("Rbx", DWORD64),
("Rsp", DWORD64),
("Rbp", DWORD64),
("Rsi", DWORD64),
("Rdi", DWORD64),
("R8", DWORD64),
("R9", DWORD64),
("R10", DWORD64),
("R11", DWORD64),
("R12", DWORD64),
("R13", DWORD64),
("R14", DWORD64),
("R15", DWORD64),
("Rip", DWORD64),
("DebugControl", DWORD64),
("LastBranchToRip", DWORD64),
("LastBranchFromRip", DWORD64),
("LastExceptionToRip", DWORD64),
("LastExceptionFromRip", DWORD64),
("DUMMYUNIONNAME", DUMMYUNIONNAME),
("VectorRegister", M128A * 26),
("VectorControl", DWORD64)
]
class DUMMYUNIONNAME(Union):
_fields_=[
("FltSave", XMM_SAVE_AREA32),
("DummyStruct", DUMMYSTRUCTNAME)
]
class DUMMYSTRUCTNAME(Structure):
_fields_=[
("Header", M128A * 2),
("Legacy", M128A * 8),
("Xmm0", M128A),
("Xmm1", M128A),
("Xmm2", M128A),
("Xmm3", M128A),
("Xmm4", M128A),
("Xmm5", M128A),
("Xmm6", M128A),
("Xmm7", M128A),
("Xmm8", M128A),
("Xmm9", M128A),
("Xmm10", M128A),
("Xmm11", M128A),
("Xmm12", M128A),
("Xmm13", M128A),
("Xmm14", M128A),
("Xmm15", M128A)
]
class XMM_SAVE_AREA32(Structure):
_pack_ = 1
_fields_ = [
(‘ControlWord‘, WORD),
(‘StatusWord‘, WORD),
(‘TagWord‘, BYTE),
(‘Reserved1‘, BYTE),
(‘ErrorOpcode‘, WORD),
(‘ErrorOffset‘, DWORD),
(‘ErrorSelector‘, WORD),
(‘Reserved2‘, WORD),
(‘DataOffset‘, DWORD),
(‘DataSelector‘, WORD),
(‘Reserved3‘, WORD),
(‘MxCsr‘, DWORD),
(‘MxCsr_Mask‘, DWORD),
(‘FloatRegisters‘, M128A * 8),
(‘XmmRegisters‘, M128A * 16),
(‘Reserved4‘, BYTE * 96)
]
class M128A(Structure):
_fields_ = [
("Low", DWORD64),
("High", DWORD64)
]
1 # Context flags for GetThreadContext()
2 CONTEXT_FULL = 0x00010007
3 CONTEXT_DEBUG_REGISTERS = 0x00010010
4
5 #get thread context
6 def get_thread_context(self, thread_id):
7
8 #64-bit context
9 context64 = WOW64_CONTEXT()
10 context64.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS
11
12 self.h_thread = self.open_thread(thread_id)
13 if kernel32.GetThreadContext(self.h_thread, byref(context64)):
14 kernel32.CloseHandle(self.h_thread)
15 return context64
16 else:
17 print ‘[*] Get thread context error. Error code: %d‘ % kernel32.GetLastError()
18 return False
1 class THREADENTRY32(Structure):
2 _fields_ = [
3 ("dwSize", DWORD),
4 ("cntUsage", DWORD),
5 ("th32ThreadID", DWORD),
6 ("th32OwnerProcessID", DWORD),
7 ("tpBasePri", DWORD),
8 ("tpDeltaPri", DWORD),
9 ("dwFlags", DWORD),
10 ]
11
12 TH32CS_SNAPTHREAD = 0x00000004
1 # enumerate threads
2 def enumerate_threads(self):
3 thread_entry = THREADENTRY32()
4 thread_list = []
5 snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, self.pid)
6 if snapshot is not None:
7 thread_entry.dwSize = sizeof(thread_entry)
8 success = kernel32.Thread32First(snapshot, byref(thread_entry))
9 while success:
10 if thread_entry.th32OwnerProcessID == self.pid:
11 thread_list.append(thread_entry.th32ThreadID)
12 kernel32.CloseHandle(snapshot)
13 success = kernel32.Thread32Next(snapshot, byref(thread_entry))
14 return thread_list
15 else:
16 return False
1 def bp_set_hw(self, address, length, condition):
2 if length not in (1,2,4):
3 return False
4 else:
5 length -= 1
6 if condition not in (HW_ACCESS, HW_EXECUTE, HW_WRITE):
7 return False
8 if not self.hardware_breakpoints.has_key(0):
9 available = 0
10 elif not self.hardware_breakpoints.has_key(1):
11 available = 1
12 elif not self.hardware_breakpoints.has_key(2):
13 available = 2
14 elif not self.hardware_breakpoints.has_key(3):
15