强网杯S9
tradre #
请将最终得到的正确输入放入 'flag{ }' 中提交。
主函数可以看到 fork 和基于 SIGALRM 的反调试,限制执行时间 60 秒,先将其修补
父进程调试自己的 fork,在收到 SIGCHLD 的时候检查是否为 int3 指令,如果是,则根据当前所处的基本块决定接下来的控制流走向,如图
其中,基本块结构体的组成如下
struct basic_block_stru_t {
basic_block_stru_t *false_branch;
union {
basic_block_stru_t *true_branch;
void *call_addr;
} u;
int (*basic_block_state_transfer_func)();
void *basic_block_addr;
}
分支指针指向其它基本块结构体的地址,状态转移函数返回一个数字,为 2 时,是作者通过额外维护一个栈实现的 ret 指令;为 4 时,是作者实现的 call 指令;为 3 时,是作者实现的兼容原版的 call 指令,用于调用库函数;1 和 0 分别代表进入 jcc 的真假分支;而 5 是作者实现的 switch 语句,可以根据这里的实现转储出所有的基本块结构体以及地址和它们的对应关系
由于子进程通过 int3 指令进行控制流转移,其控制流图是一条直线
转储结构体后,考虑修补 int3 来恢复原有的控制流
首先根据结构体之间的关系,将其排拓扑序
深搜时,将 ret 视为叶子节点,忽略 call 的目标分支
然后将 int3 替换为原来的指令,如果观察到拓扑序后继基本块的地址不是其原来的后继,添加一个 jmp
最后,重定位 RIP 寻址和所有跳转指令
观测到的 jcc 有 jle, jne, je, jns, jg 和 jl,脚本如下
from pwn import *
from capstone import *
from keystone import *
import re
md = Cs(CS_ARCH_X86, CS_MODE_64)
ks = Ks(KS_ARCH_X86, KS_MODE_64)
with open("dump.bin", "rb") as fp:
buffer = fp.read()
stru_base = 0x606AC0
state_transes = {
0x401EB4: True, # to 4 CALL
0x401EA5: True, # to 3 EXT CALL
0x0: True, # JMP
0x401CA6: True, # JLE
0x401D22: True, # JNE
0x401D5B: True, # JE
0x401E96: True, # to 2 RET
0x401DCD: True, # JNS
0x401F0C: True, # JG
0x401C31: True, # JL
}
CALL = 0x401EB4
EXT_CALL = 0x401EA5
JMP = 0x0
JLE = 0x401CA6
JNE = 0x401D22
JE = 0x401D5B
RET = 0x401E96
JNS = 0x401DCD
JG = 0x401F0C
JL = 0x401C31
bbs = []
bb_by_stru = dict()
class BB:
def __init__(self, stru_address, false_branch, true_branch, state_trans, address):
self.stru_address = stru_address
self.false_branch = false_branch
self.true_branch = true_branch
self.state_trans = state_trans
self.address = address
for i in range(0, 32 * 181, 32):
stru = buffer[i : i + 32]
stru_address = stru_base + i
false_branch = u64(stru[0:8])
true_branch = u64(stru[8:16])
st = u64(stru[16:24])
address = u64(stru[24:32])
# if state_trans not in state_transes:
# state_transes[state_trans] = True
bb = BB(stru_address, false_branch, true_branch, st, address)
bbs.append(bb)
if bb.stru_address not in bb_by_stru:
bb_by_stru[bb.stru_address] = bb
else:
raise Exception
topo = []
bb_by_addr = dict()
def dfs(bb: BB):
if bb.address not in bb_by_addr:
bb_by_addr[bb.address] = bb
else:
return
topo.append(bb)
if bb.state_trans not in state_transes:
raise Exception
if bb.state_trans == RET:
return
elif bb.state_trans == CALL or bb.state_trans == EXT_CALL:
false_branch = bb.false_branch
dfs(bb_by_stru[false_branch])
elif bb.state_trans == JMP:
true_branch = bb.true_branch
dfs(bb_by_stru[true_branch])
else:
false_branch = bb.false_branch
dfs(bb_by_stru[false_branch])
true_branch = bb.true_branch # 先 false,再 true
dfs(bb_by_stru[true_branch])
for bb in bbs:
if bb.address not in bb_by_addr:
dfs(bb)
assert len(bb_by_addr) == len(topo)
with open("code.bin", "rb") as fp:
txt = fp.read()
base_addr = 0x4009F7
text_addr = 0x0 + base_addr
insns = md.disasm(txt, text_addr)
insn_dict = dict()
for insn in insns:
insn_dict[insn.address] = insn
addr_map = {}
to_reloc = dict()
recovered = bytearray()
for i in range(len(topo)):
bb = topo[i]
va = bb.address
while True:
insn = insn_dict[va]
new_va = len(recovered) + text_addr
addr_map[va] = new_va
if insn.mnemonic == "int3":
st = bb.state_trans
assert st in state_transes
if st == RET:
insn_data = ks.asm("ret", va)[0]
recovered.extend(insn_data)
elif st == JMP:
dest = bb_by_stru[bb.true_branch].address
to_reloc[new_va] = dest
dest = 0xFFFF_FFFF
insn_data = ks.asm("jmp\t%s" % hex(dest), new_va)[0]
recovered.extend(insn_data)
else:
insn_data = None # dummy
dest = 0 # dummy
if st == EXT_CALL:
dest = bb.true_branch # notice!!!
else:
dest = bb_by_stru[bb.true_branch].address
to_reloc[new_va] = dest
if st == CALL:
insn_data = ks.asm("call\t%s" % hex(0xFFFF_FFFF), new_va)[0]
elif st == EXT_CALL:
insn_data = ks.asm("call\t%s" % hex(0xFFFF_FFFF), new_va)[0]
elif st == JLE: # JLE
insn_data = ks.asm("jle\t%s" % hex(0xFFFF_FFFF), new_va)[0]
elif st == JNE: # JNE
insn_data = ks.asm("jne\t%s" % hex(0xFFFF_FFFF), new_va)[0]
elif st == JE: # JE
insn_data = ks.asm("je\t%s" % hex(0xFFFF_FFFF), new_va)[0]
elif st == JNS: # JNS
insn_data = ks.asm("jns\t%s" % hex(0xFFFF_FFFF), new_va)[0]
elif st == JG: # JG
insn_data = ks.asm("jg\t%s" % hex(0xFFFF_FFFF), new_va)[0]
elif st == JL: # JL
insn_data = ks.asm("jl\t%s" % hex(0xFFFF_FFFF), new_va)[0]
else:
raise Exception
recovered.extend(insn_data)
assert bb.false_branch != 0
default_dest = bb_by_stru[bb.false_branch].address
if (
i + 1 >= len(topo) or default_dest != topo[i + 1].address
): # need a new jmp
jmp_new_va = new_va + len(insn_data)
to_reloc[jmp_new_va] = default_dest
insn_data = ks.asm("jmp\t%s" % hex(0xFFFF_FFFF), jmp_new_va)[0]
recovered.extend(insn_data)
print("inserted a new jmp at 0x%x" % jmp_new_va)
break # no need to update va
else:
match = re.search(r"rip \+ ([0-9a-zA-Z]+)", insn.op_str)
if match is not None:
print("%016x:\t%s\t%s" % (va, insn.mnemonic, insn.op_str))
fake_rel = match.groups()[0]
if fake_rel.startswith("0x"):
fake_rel = int(fake_rel, 16)
else:
fake_rel = int(fake_rel)
dest = fake_rel + insn.address + insn.size
true_rel = dest - (new_va + insn.size)
new_op_str = re.sub(r"rip \+ ([0-9a-zA-Z]+)", f"rip + {hex(true_rel)}", insn.op_str)
print(f"cooked new op_str {new_op_str}")
insn_data = ks.asm("%s\t%s" % (insn.mnemonic, new_op_str), new_va)[0]
assert len(insn_data) == insn.size
recovered.extend(insn_data)
va += len(insn_data)
else:
recovered.extend(insn.bytes)
va += insn.size
# relocate
for insn in md.disasm(recovered, text_addr):
va = insn.address
idx = va - text_addr
sz = insn.size
mnemonic = insn.mnemonic
op_str = insn.op_str
# print("%016x:\t%s\t%s" % (insn.address, insn.mnemonic, insn.op_str))
# continue
# jmp/jcc or call
if va in to_reloc:
assert mnemonic.startswith("j") or mnemonic == "call"
print("%016x:\t%s\t%s" % (va, mnemonic, op_str))
dest = to_reloc[va]
true_dest = addr_map[dest] if dest in addr_map else dest
new_op_str = hex(true_dest)
print(f"cooked new op_str {new_op_str}")
insn_data = ks.asm("%s\t%s" % (mnemonic, new_op_str), va)[0]
assert len(insn_data) <= sz
insn_data.extend([0x90] * (sz - len(insn_data)))
recovered[idx : idx + sz] = insn_data
continue
# default behavior: do nothing
pass
with open("disasm.txt", "w") as fp:
for insn in md.disasm(recovered, text_addr):
fp.write("%016x:\t%s\t%s\n" % (insn.address, insn.mnemonic, insn.op_str))
# with open("recovered.bin", "wb") as fp:
# fp.write(recovered)
with open("tradre", "rb") as fp:
buffer = bytearray(fp.read())
# 12 .text 00003fb2 0000000000400910 0000000000400910 00000910 2**4
buffer[0x910 : 0x910 + 0x3FB2] = b"\x90" * 0x3FB2
buffer[base_addr - 0x400000 : base_addr - 0x400000 + len(recovered)] = recovered
with open("recovered.elf", "wb") as fp:
fp.write(buffer)
恢复后的函数使用了 srand(0x10000) 制造假随机数掩盖了一个标准 AES 实现
AES 的密钥通过假随机生成,AES 的密文可以通过异或一些假随机数来恢复
import ctypes
libc = ctypes.CDLL("libc.so.6")
libc.srand(0x10000)
key = [libc.rand() & 0xFF for i in range(16)]
test = libc.rand()
libc.srand(test)
xor_key = [-30, -117, 85, 56, 105, -6, 0x80, -62, 100, 78, 127, -25, 19, 6, 20,
-59, -64, 19, -45, 18, 107, -67, -14, -57, -120, 68, 62, 9, -24, -93,
-125, 48]
for i in range(32):
xor_key[i] &= 0xFF
sbox = [0x81, 0xF7, 0x22, 0x43, 0x9B, 0x91, 0xEF, 0x7, 0x54, 0x4F, 0x18, 0xCC,
0xED, 0xD1, 0xBF, 0xB3, 0xA, 0x91, 0x1A, 0x6F, 0x91, 0xE4, 0xB5, 0x37,
0x25, 0x90, 0x9C, 0xA6, 0x74, 0x7, 0xF1, 0xF0, 0x55, 0x76, 0xC6, 0x1E,
0x5F, 0xC5, 0x77, 0xE, 0x50, 0xEB, 0x9A, 0x16, 0x62, 0xDE, 0x25, 0xD0, 0xC4,
0xD4, 0xF0, 0xD1, 0x73, 0x2B, 0xF7, 0x5D, 0x8F, 0x56, 0xBE, 0xEB, 0x3, 0x84,
0x31, 0x45, 0xEB, 0x8, 0x79, 0x22, 0x72, 0x94, 0xDA, 0x62, 0x36, 0x75,
0xA9, 0x54, 0x3A, 0xE5, 0x3B, 0x41, 0x93, 0xC2, 0xD3, 0xFF, 0x4B, 0x41,
0x43, 0x9C, 0xE2, 0x8F, 0x80, 0x30, 0xA2, 0xEF, 0xDB, 0xFF, 0x32, 0x64,
0xFF, 0xC3, 0x2A, 0xB7, 0xB3, 0x47, 0x21, 0xB7, 0x7D, 0x98, 0x43, 0x3A,
0x8B, 0x6D, 0x91, 0xB0, 0x93, 0x9D, 0xF9, 0x20, 0xCA, 0x32, 0x34, 0xF2,
0xE4, 0x28, 0xF8, 0x5C, 0x70, 0xE2, 0x2F, 0x87, 0x46, 0xD4, 0x36, 0x6D,
0xC4, 0xD5, 0xA0, 0xE9, 0x1, 0xDA, 0x77, 0x5B, 0xD, 0xB6, 0xA0,
0x92, 0x9C, 0xCE, 0x49, 0x97, 0x62, 0x4F, 0xCE, 0xAA, 0x86, 0x1D, 0x36,
0xFD, 0x88, 0xEB, 0x2, 0xB9, 0x6F, 0x32, 0x20, 0xFC, 0xA4, 0x9E,
0xA6, 0x9D, 0xD3, 0x85, 0x82, 0x93, 0xF0, 0xBC, 0x27, 0xDB, 0xE4, 0x7F,
0xE6, 0x68, 0xBC, 0x6E, 0xE4, 0x12, 0xCA, 0xE3, 0x8D, 0xD9, 0x2D, 0x38,
0x58, 0xF3, 0x70, 0x16, 0x75, 0x5C, 0x34, 0x4, 0x8C, 0x93, 0xB, 0xF8,
0x58, 0xBB, 0x9F, 0x4F, 0xB0, 0x2D, 0x66, 0x74, 0x23, 0xBE,
0x4, 0xC9, 0xE9, 0x71, 0x69, 0xB0, 0x6E, 0x62, 0x9E, 0xAE, 0x3, 0x73, 0xCD,
0x29, 0x0, 0x23, 0xE, 0x56, 0xFF, 0x50, 0xF8, 0xE, 0xDD, 0x53, 0x3C, 0x1A,
0x4C, 0xB2, 0x5A, 0x1F, 0xD4, 0x5B, 0xB0, 0xAF, 0xC9, 0xDD, 0x13, 0x6,
0x58, 0xF7, 0x38, 0x26]
for i in range(256):
sbox[i] = sbox[i] ^ xor_key[i % 32]
prompt = b"Congratulations! This is the correct flag!"
ct = []
for mm in range(16):
x = libc.rand() & 0xFF
ct.append(x ^ xor_key[mm] ^ prompt[mm])
for nn in range(16):
x = libc.rand() & 0xFF
ct.append(x ^ xor_key[nn + 16] ^ prompt[nn + 17])
key = bytes(key)
from Crypto.Cipher import AES
aes = AES.new(key, AES.MODE_ECB)
aes.decrypt(bytes(ct))
# 1629d52128ca395e4f6a0fc98712a3e1
butterfly #
F5,逻辑比较清晰,将待加密文件分成 64 比特的块,每个块先与 64 比特的密钥逐字节 XOR,再交换其中的奇偶字节,然后循环左移 1 位,最后逐字节和密钥对应字节相加
实现方式是 SIMD 指令,包括 pxor, psllq, psrlq, por, paddb 等,密钥为 MMXEncod
from pwn import *
key = b"MMXEncod"
with open("encode.dat", "rb") as fp:
buffer = fp.read()
def packed_decrypt(x):
for i in range(8):
x[i] = (x[i] - key[i]) & 0xff
x = u64(bytes(x))
x = ((x >> 1) | (x << 63)) & 0xffff_ffff_ffff_ffff
x = p64(x)
x = list(x)
for i in range(0, 8, 2):
x[i], x[i+1] = x[i+1], x[i]
for i in range(8):
x[i] = x[i] ^ key[i]
return x
flag = []
for p in range(0, len(buffer), 8):
m64 = list(buffer[p:p+8])
if len(m64) < 8:
flag.extend(m64)
else:
flag.extend(packed_decrypt(m64))
print(bytes(flag))
# flag{butter_fly_mmx_encode_7778167}
复盘 #
PolyEncryption #
I love many languages what should i choose to create a reverse challenge? How about all of it :P
WIP
ABabyChal #
一个简单的挑战
flag 长度 38,md5 为 7a2028696ca643a57ddeda6642f781ae
WIP
iked #
WIP
总结 #
WIP