ollvm

ollvm介绍

OLLVM中有三种保护方案:BCF(Bogus Control Flow,中文名虚假控制流)、FLA(Control Flow Flattening,中文名控制流平坦化)、SUB(Instructions Substitution,中文名指令替换)

这些保护方案就是字面意思,下面给出他们的加了保护之后ida显示的代码块的截图就可以大概了解了

未混淆版:

image

FLA版:

image-1677367269728

BCF版:

image-1677367465373

SUB版:

image-1677367472307

全开:

image-1677367480229

ollvm环境搭建

ollvm其中最影响分析的当属控制流平坦化fla了,在解开fla之前得先了解一下,这个程序是如何编译出来的。我这里就用的下面的链接里的方法编译出能可以ollvm的clang编译器
https://www.jianshu.com/p/9136f7257e46

这里我用跟链接里相同的环境,只是在

image-1677367586353

修改代码时,得把char换成uint8_t。

编译完成后就可以看到在bin目录下看到clang的可执行文件

image-1677367488233

这时就有一个问题了,为什么这里编译后的clang能进行ollvm处理代码?

这里简单说一下clang+llvm的编译器组合,注意这里说的llvm与ollvm不一样,一个现代编译器的结构如下:

image-1677367499365

之所以能产生上面图中的代码是因为在代码优化部分进行了处理,当编译器前端分析完写好的代码就会产生叫LLVM IR(Intermediate Representation)的中间代码,产生的中间代码会在代码优化部分进行优化。

具体是怎么优化的呢?

LLVM Pass是代码优化过程中的一个重要部分,LLVM pass就是代码优化中的一个个模块,在代码优化的过程中,就像流水线一样呈线性排列的对生成的中间语言LLVM IR进行优化,第一个pass处理完就会交给下一个pass,直到完成代码的优化。

详细的过程可以看下面这个文章

https://kiprey.github.io/2020/06/LLVM-IR-pass/#简介

clang相关命令

基本命令

代码→LLVM IR

clang -S -emit-llvm hello.cpp -o hello.ll

clang -c -emit-llvm hello.cpp -o hello.bc

这两种后缀的都是IR中间原因其中.ll文件是可以阅读的文本而.bc文件则是二进制的机器码,它们作用都一样,都是用来进行优化的中间代码,只是存在状态不同。

LLVM IR→(优化后)LLVM IR

opt -load LLVMObfuscator.so -S hello.ll -o hello_opt.ll

-load加载指定的Pass集合

-S输出LLVM IR而不是LLVM字节码

LLVM IR→可执行文件

clang hello_opt.ll -o hello

ollvm混淆指令

查看所有pass集合,值得注意的是用load产生加载的集合与这里直接显示的集合不同的是,这里显示的显示的集合可以不用load参数就可以调用

opt -load -help

clang的-mllvm的参数意思是在代码进行优化的时候要执行的命令参数,也就是要执行的opt的命令

虚假控制流(Bogus Control Flow)

  • mllvm -bcf : 激活虚假控制流
  • mllvm -bcf_loop=3 : 混淆次数,这里一个函数会被混淆3次,默认为 1
  • mllvm -bcf_prob=40 : 每个基本块被混淆的概率,这里每个基本块被混淆的概率为40%,默认为 30 %
clang -mllvm -bcf -mllvm -bcf_loop=3 -mllvm -bcf_prob=40 hello.cpp -o hello_bcf

控制流平坦化(Control Flow Flattening)

  • mllvm -fla : 激活控制流平坦化
  • mllvm -split : 激活基本块分割
  • mllvm -split_num=3 : 指定基本块分割的数目
clang -mllvm -fla -mllvm -split -mllvm -split_num=3 hello.cpp -o hello_fla

指令替换(Instruction Substitution)

  • mllvm -sub : 激活指令替代
  • mllvm -sub_loop=3 : 混淆次数,这里一个函数会被混淆3次,默认为 1次
clang -mllvm -sub -mllvm -sub_loop=3 hello.cpp -o hello_sub

去混淆

angr版

image-1677367661195

脚本来源https://github.com/cq674350529/deflat

进行了一点修改,修复了一些bug吧,因为每个混淆后的无用的代码块不能确定大小,所以每次运行的时候要进行一定的调整。

当能成功找到所有后继块之后脚本就算成功运行了,注意下面的代码并不能直接执行,这里贴在这里是为了便于观看,核心逻辑都在下面的代码里。

脚本的运行原理:

  • 首先确定代码的开始和结束位置,然后交给angr分析出CFG图,cfg图是一种数据结构,类似于ida的代码块那样的,只不过是他的抽象形式。然后遍历整个有向图,找到入度为0的地方即为入口块,出度为0的地方即为retn块
  • 然后就是找到主分发块,有时主分发块就是入口块的一部分,而原程序是通过入度数量大于1来判定主分发块的,最终就导致了主分发就是入口块,这就是原来脚本不能跑起来的根本原因了。

解决办法是什么呢?通过patch一个jmp跳转指令来人为的构造一个块,把这个分发块构造出来。

为什么通jmp指令就能人为的构造一个块呢?是因为angr分析出来的CFG有向图是以jmp指令来分割的,也就是是如果有一个jmp指令,那么angr就会把它分析成一个块。这样就能成功识别出来了。

  • 通过主分发块找他的前支就只有入口块,和预处理块。然后再通过预处理块去找到所有有用块,但是这里找到的有用块里面的不是所有都是有用的,有很多长得很相似的块都是可以直接nop的,所以要找出来,在那个脚本里是直接进行进行大小判断的小于一定的大小就会直接nop,但是在不同的程序里,这个大小是变化的,所以这就是另一个有bug的地方。

找到所有有用块之后,就相当于把所有块进行了分类,分为可以nop掉的块,入口块,主分发块,预处理块和有有用块

然后就开始对每个有用块进行模拟执行,找到每个有用块之间的关系,就完成了整个代码的还原了

#!/usr/bin/env python3
import sys

sys.path.append("..")

import argparse
import angr
import pyvex
import claripy
import struct
from collections import defaultdict

import am_graph
from util import *

import logging

logging.getLogger('angr.state_plugins.symbolic_memory').setLevel(logging.ERROR)

# logging.getLogger('angr.sim_manager').setLevel(logging.DEBUG)

def get_relevant_nop_nodes(supergraph, pre_dispatcher_node, prologue_node, retn_node):
    # relevant_nodes = list(supergraph.predecessors(pre_dispatcher_node))
    relevant_nodes = []
    nop_nodes = []
    for node in supergraph.nodes():
        if supergraph.has_edge(node, pre_dispatcher_node) and node.size > 16 and node.addr != prologue_node.addr:
            # XXX: use node.size is faster than to create a block
            relevant_nodes.append(node)
            continue
        if node.addr in (prologue_node.addr, retn_node.addr, pre_dispatcher_node.addr):
            continue
        nop_nodes.append(node)
    return relevant_nodes, nop_nodes

def symbolic_execution(project, relevant_block_addrs, start_addr, hook_addrs=None, modify_value=None, inspect=False,
                       hook_size=None, pre_dispatcher_node=None):
    def retn_procedure(state):
        ip = state.solver.eval(state.regs.ip)
        project.unhook(ip)
        return

    def statement_inspect(state):
        expressions = list(
            state.scratch.irsb.statements[state.inspect.statement].expressions)
        if len(expressions) != 0 and isinstance(expressions[0], pyvex.expr.ITE):
            state.scratch.temps[expressions[0].cond.tmp] = modify_value
            state.inspect._breakpoints['statement'] = []

    if hook_addrs is not None:
        for hook_addr in hook_addrs:
            skip_length = hook_size[hook_addr]
            project.hook(hook_addr, retn_procedure, length=skip_length)

    state = project.factory.blank_state(addr=start_addr, remove_options={
        angr.sim_options.LAZY_SOLVES})
    if inspect:
        state.inspect.b(
            'statement', when=angr.state_plugins.inspect.BP_BEFORE, action=statement_inspect)
    sm = project.factory.simulation_manager(state)
    sm.step()
    # 设置要执行的圈数
    flag = 0
    while len(sm.active) > 0:
        for active_state in sm.active:
            if active_state.addr in relevant_block_addrs:
                return active_state.addr
            # 避免死循环
            if active_state.addr == pre_dispatcher_node.addr:
                flag = flag + 1
            if flag == 2:
                return None
        sm.step()

    return None

def main():
    DEBUG = True
    end = 0
    start = 0
    if DEBUG:
        filename = "./rushB"
        start = 0x4005A0
        end = 0x400A71
    else:
        parser = argparse.ArgumentParser(description="deflat control flow script")
        parser.add_argument("-f", "--file", help="binary to analyze")
        parser.add_argument(
            "--addr", help="address of target function in hex format")
        args = parser.parse_args()

        if args.file is None or args.addr is None:
            parser.print_help()
            sys.exit(0)

        filename = args.file
        start = int(args.addr, 16)

    project = angr.Project(filename, load_options={'auto_load_libs': False})
    # do normalize to avoid overlapping blocks, disable force_complete_scan to avoid possible "wrong" blocks
    # cfg = project.analyses.CFGFast(normalize=True, force_complete_scan=False)
    cfg = project.analyses.CFGFast(
        start=start >> 12 << 12,
        end=end,
        force_complete_scan=False,
        normalize=True,
    )
    target_function = cfg.functions.get(start)
    # A super transition graph is a graph that looks like IDA Pro's CFG
    supergraph = am_graph.to_supergraph(target_function.transition_graph)

    base_addr = project.loader.main_object.mapped_base >> 12 << 12

    # get prologue_node and retn_node
    prologue_node = None
    retn_node = None
    for node in supergraph.nodes():
        if supergraph.in_degree(node) == 0:
            prologue_node = node
        if supergraph.out_degree(node) == 0:
            retn_node = node
    # tset retn_node
    # block = project.factory.block(retn_node.addr, size=retn_node.size)
    # for ins in block.capstone.insns:
    #     print(ins.insn.mnemonic)

    if prologue_node is None or prologue_node.addr != start:
        print("Something must be wrong...")
        sys.exit(-1)

    # #######################################
    # 对main_dispatcher_node和pre_dispatcher_node的处理需要注意,这两正确基本可以跑出相关块,必要的时候这两个块可以相等
    # #######################################
    main_dispatcher_node = list(supergraph.successors(prologue_node))[0]
    # main_dispatcher_node = prologue_node
    # pre_dispatcher_node = main_dispatcher_node
    for node in supergraph.predecessors(main_dispatcher_node):
        if node.addr != prologue_node.addr:
            pre_dispatcher_node = node
            break

    relevant_nodes, nop_nodes = get_relevant_nop_nodes(
        supergraph, pre_dispatcher_node, prologue_node, retn_node)
    print('*******************relevant blocks************************')
    print('prologue: %#x' % start)
    print('main_dispatcher: %#x' % main_dispatcher_node.addr)
    print('pre_dispatcher: %#x' % pre_dispatcher_node.addr)
    print('retn: %#x' % retn_node.addr)
    relevant_block_addrs = [node.addr for node in relevant_nodes]
    print('relevant_blocks:', [hex(addr) for addr in relevant_block_addrs])
    print('*******************symbolic execution*********************')

    relevants = relevant_nodes
    relevants.append(prologue_node)
    relevants_without_retn = list(relevants)
    relevants.append(retn_node)
    relevant_block_addrs.extend([prologue_node.addr, retn_node.addr])

    flow = defaultdict(list)
    patch_instrs = {}

    # 注意符号执行找相关块的时候是把体类所有的函数调用都给hook了的,所以当调用的函数会修改符号执行的流程的时候这时的脚本是跑不了的,得把文件本身修了
    for relevant in relevants_without_retn:
        print('-------------------dse %#x---------------------' % relevant.addr)
        block = project.factory.block(relevant.addr, size=relevant.size)
        has_branches = False
        hook_addrs = []
        hook_size = {}
        for ins in block.capstone.insns:
            if project.arch.name in ARCH_X86:
                if ins.insn.mnemonic.startswith('cmov'):
                    # only record the first one
                    if relevant not in patch_instrs:
                        patch_instrs[relevant] = ins
                        has_branches = True

                elif ins.insn.mnemonic.startswith('call'):
                    hook_addrs.append(ins.insn.address)
                    hook_size[ins.insn.address] = len(ins.insn.bytes)
            elif project.arch.name in ARCH_ARM:
                if ins.insn.mnemonic != 'mov' and ins.insn.mnemonic.startswith('mov'):
                    if relevant not in patch_instrs:
                        patch_instrs[relevant] = ins
                        has_branches = True
                elif ins.insn.mnemonic in {'bl', 'blx'}:
                    hook_addrs.append(ins.insn.address)
            elif project.arch.name in ARCH_ARM64:
                if ins.insn.mnemonic.startswith('cset'):
                    if relevant not in patch_instrs:
                        patch_instrs[relevant] = ins
                        has_branches = True
                elif ins.insn.mnemonic in {'bl', 'blr'}:
                    hook_addrs.append(ins.insn.address)

        if has_branches:
            tmp_addr = symbolic_execution(project, relevant_block_addrs,
                                          relevant.addr, hook_addrs, claripy.BVV(1, 1), True, hook_size=hook_size, pre_dispatcher_node=pre_dispatcher_node)
            if tmp_addr is not None:
                flow[relevant].append(tmp_addr)
            tmp_addr = symbolic_execution(project, relevant_block_addrs,
                                          relevant.addr, hook_addrs, claripy.BVV(0, 1), True, hook_size=hook_size, pre_dispatcher_node=pre_dispatcher_node)
            if tmp_addr is not None:
                flow[relevant].append(tmp_addr)
        else:
            tmp_addr = symbolic_execution(project, relevant_block_addrs,
                                          relevant.addr, hook_addrs, hook_size=hook_size, pre_dispatcher_node=pre_dispatcher_node)
            if tmp_addr is not None:
                flow[relevant].append(tmp_addr)

        # tmp_addr = symbolic_execution(project, relevant_block_addrs,
        #                               relevant.addr, hook_addrs, claripy.BVV(1, 1), True, hook_size=hook_size, pre_dispatcher_node=pre_dispatcher_node)
        # if tmp_addr is not None:
        #     flow[relevant].append(tmp_addr)
        # tmp_addr = symbolic_execution(project, relevant_block_addrs,
        #                               relevant.addr, hook_addrs, claripy.BVV(0, 1), True, hook_size=hook_size, pre_dispatcher_node=pre_dispatcher_node)
        # if tmp_addr is not None:
        #     flow[relevant].append(tmp_addr)

    print('************************flow******************************')
    for k, v in flow.items():
        print('%#x: ' % k.addr, [hex(child) for child in v])
    print('%#x: ' % retn_node.addr, [])
    print('************************patch*****************************')

    with open(filename, 'rb') as origin:
        # Attention: can't transform to str by calling decode() directly. so use bytearray instead.
        origin_data = bytearray(origin.read())
        origin_data_len = len(origin_data)

    recovery_file = filename + '_recovered'
    recovery = open(recovery_file, 'wb')

    # patch irrelevant blocks
    for nop_node in nop_nodes:
        fill_nop(origin_data, nop_node.addr - base_addr,
                 nop_node.size, project.arch)
    fill_nop(origin_data, main_dispatcher_node.addr - base_addr, main_dispatcher_node.size, project.arch)
    # remove unnecessary control flows
    for parent, childs in flow.items():
        if len(childs) == 1:
            parent_block = project.factory.block(parent.addr, size=parent.size)
            last_instr = parent_block.capstone.insns[-1]
            size_last = len(last_instr.insn.bytes)
            # add
            last_two_instr = parent_block.capstone.insns[-2]
            size_last_two = len(last_two_instr.insn.bytes)
            # nop_instructions = last_two_instr.address
            nop_size = size_last + size_last_two

            file_offset = last_two_instr.address - base_addr
            # patch the last instruction to jmp
            if project.arch.name in ARCH_X86:
                # fill_nop(origin_data, file_offset,
                #          last_instr.size, project.arch)
                fill_nop(origin_data, file_offset, nop_size, project.arch)
                patch_value = ins_j_jmp_hex_x86(last_two_instr.address, childs[0], 'jmp')
            elif project.arch.name in ARCH_ARM:
                patch_value = ins_b_jmp_hex_arm(last_instr.address, childs[0], 'b')
                if project.arch.memory_endness == "Iend_BE":
                    patch_value = patch_value[::-1]
            elif project.arch.name in ARCH_ARM64:
                # FIXME: For aarch64/arm64, the last instruction of prologue seems useful in some cases, so patch the next instruction instead.
                if parent.addr == start:
                    file_offset += 4
                    patch_value = ins_b_jmp_hex_arm64(last_instr.address + 4, childs[0], 'b')
                else:
                    patch_value = ins_b_jmp_hex_arm64(last_instr.address, childs[0], 'b')
                if project.arch.memory_endness == "Iend_BE":
                    patch_value = patch_value[::-1]
            patch_instruction(origin_data, file_offset, patch_value)
        else:
            instr = patch_instrs[parent]
            file_offset = instr.address - base_addr
            # patch instructions starting from `cmovx` to the end of block
            size = 0
            fill_nop(origin_data, file_offset, parent.addr +
                     parent.size - base_addr - file_offset, project.arch)
            if project.arch.name in ARCH_X86:
                # patch the cmovx instruction to jx instruction
                patch_value = ins_j_jmp_hex_x86(instr.address, childs[0], instr.mnemonic[len('cmov'):])
                patch_instruction(origin_data, file_offset, patch_value, size)

                file_offset += 6
                # patch the next instruction to jmp instrcution
                patch_value = ins_j_jmp_hex_x86(instr.address + 6, childs[1], 'jmp')
                patch_instruction(origin_data, file_offset, patch_value, size)
            elif project.arch.name in ARCH_ARM:
                # patch the movx instruction to bx instruction
                bx_cond = 'b' + instr.mnemonic[len('mov'):]
                patch_value = ins_b_jmp_hex_arm(instr.address, childs[0], bx_cond)
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value, size)

                file_offset += 4
                # patch the next instruction to b instrcution
                patch_value = ins_b_jmp_hex_arm(instr.address + 4, childs[1], 'b')
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value, size)
            elif project.arch.name in ARCH_ARM64:
                # patch the cset.xx instruction to bx instruction
                bx_cond = instr.op_str.split(',')[-1].strip()
                patch_value = ins_b_jmp_hex_arm64(instr.address, childs[0], bx_cond)
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value, size)

                file_offset += 4
                # patch the next instruction to b instruction
                patch_value = ins_b_jmp_hex_arm64(instr.address + 4, childs[1], 'b')
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value, size)

    assert len(origin_data) == origin_data_len, "Error: size of data changed!!!"
    recovery.write(origin_data)
    recovery.close()
    print('Successful! The recovered file: %s' % recovery_file)
    for nop_node in nop_nodes:
        fill_nop(origin_data, nop_node.addr - base_addr,
                 nop_node.size, project.arch)

        # remove unnecessary control flows
    for parent, childs in flow.items():
        if len(childs) == 1:
            parent_block = project.factory.block(parent.addr, size=parent.size)
            last_instr = parent_block.capstone.insns[-1]
            file_offset = last_instr.address - base_addr
            # patch the last instruction to jmp
            if project.arch.name in ARCH_X86:
                fill_nop(origin_data, file_offset,
                         last_instr.size, project.arch)
                patch_value = ins_j_jmp_hex_x86(last_instr.address, childs[0], 'jmp')
            elif project.arch.name in ARCH_ARM:
                patch_value = ins_b_jmp_hex_arm(last_instr.address, childs[0], 'b')
                if project.arch.memory_endness == "Iend_BE":
                    patch_value = patch_value[::-1]
            elif project.arch.name in ARCH_ARM64:
                # FIXME: For aarch64/arm64, the last instruction of prologue seems useful in some cases, so patch the next instruction instead.
                if parent.addr == start:
                    file_offset += 4
                    patch_value = ins_b_jmp_hex_arm64(last_instr.address + 4, childs[0], 'b')
                else:
                    patch_value = ins_b_jmp_hex_arm64(last_instr.address, childs[0], 'b')
                if project.arch.memory_endness == "Iend_BE":
                    patch_value = patch_value[::-1]
            patch_instruction(origin_data, file_offset, patch_value)
        else:
            instr = patch_instrs[parent]
            file_offset = instr.address - base_addr
            # patch instructions starting from `cmovx` to the end of block
            fill_nop(origin_data, file_offset, parent.addr +
                     parent.size - base_addr - file_offset, project.arch)
            if project.arch.name in ARCH_X86:
                # patch the cmovx instruction to jx instruction
                patch_value = ins_j_jmp_hex_x86(instr.address, childs[0], instr.mnemonic[len('cmov'):])
                patch_instruction(origin_data, file_offset, patch_value)

                file_offset += 6
                # patch the next instruction to jmp instrcution
                patch_value = ins_j_jmp_hex_x86(instr.address + 6, childs[1], 'jmp')
                patch_instruction(origin_data, file_offset, patch_value)
            elif project.arch.name in ARCH_ARM:
                # patch the movx instruction to bx instruction
                bx_cond = 'b' + instr.mnemonic[len('mov'):]
                patch_value = ins_b_jmp_hex_arm(instr.address, childs[0], bx_cond)
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value)

                file_offset += 4
                # patch the next instruction to b instrcution
                patch_value = ins_b_jmp_hex_arm(instr.address + 4, childs[1], 'b')
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value)
            elif project.arch.name in ARCH_ARM64:
                # patch the cset.xx instruction to bx instruction
                bx_cond = instr.op_str.split(',')[-1].strip()
                patch_value = ins_b_jmp_hex_arm64(instr.address, childs[0], bx_cond)
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value)

                file_offset += 4
                # patch the next instruction to b instruction
                patch_value = ins_b_jmp_hex_arm64(instr.address + 4, childs[1], 'b')
                if project.arch.memory_endness == 'Iend_BE':
                    patch_value = patch_value[::-1]
                patch_instruction(origin_data, file_offset, patch_value)

    assert len(origin_data) == origin_data_len, "Error: size of data changed!!!"
    recovery.write(origin_data)
    recovery.close()
    print('Successful! The recovered file: %s' % recovery_file)

if __name__ == '__main__':
    main()

采用angr的缺点

由于angr脚本里是直接以块为起点,模拟执行来找到下一个块的,如果在这个块之前还产生了对这个块的执行流程产生了影响的事件,那么本次模拟处理的结果就是不准确的。

unicorn版

为什么要用unicorn呢?

因为unicorn就相当于一个虚拟cpu,所以处理的细微程度远远高于angr的代码块级,修复出来的代码精度更高。如果能用unicorn从入口块开始完整模拟执行到retn块结束,就能相对完整的还原整个代码。

由于unicorn的模拟代码还没写完,这里就暂时不贴出来了(等我写完,就补上

小结

总体而言,ollvm现在已经不算一个很难的问题了,除非ollvm里面有异常处理。但是这种问题实战遇到的情况不多,打ctf遇到的要多一些。

Q.E.D.