CPython internal AST structure deep-copy functions
2024-07-26 05:16:43

source codes

deepcopy_ast.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
###
# This script is used to generate deepcopy functions for CPython internal AST nodes.
# License: MIT
# Author: Eritque arcus
# Date: July 20, 2024
###

import sys

if len(sys.argv) != 3:
print("Usage: python deepcopy_ast.py <header> <output>")
sys.exit(1)

TARGETS = ["expr", "stmt", "excepthandler",
"pattern", "type_ignore", "type_param", "mod"]
INDIVIDUAL_TARGETS = ["comprehension", "arguments",
"arg", "keyword", "alias", "withitem", "match_case"]


UNION_TARGET_TEMPLATE = """%s_ty %s_copy(%s_ty val, PyArena *arena)
{
if(val == NULL){
return NULL;
}
%s_ty re = _PyArena_Malloc(arena, sizeof(struct _%s));
re->kind = val->kind;
switch (val->kind) {
BODY
}
OTHER
return re;
}"""

UNION_STMT_PATCH = """
case Pass_kind:
case Break_kind:
case Continue_kind:
break;
""".strip()

INDIVIDUAL_TARGET_TEMPLATE = """%s_ty %s_copy(%s_ty val, PyArena *arena)
{
if(val == NULL){
return NULL;
}
%s_ty re = _PyArena_Malloc(arena, sizeof(struct _%s));
BODY
return re;
}"""

header = sys.argv[1]
with open(header, "r", encoding="utf-8") as f:
codes_raw = f.read().strip().split("\n")


def parse_struct(start: int, codes: list[str], name: str = ""):
parse_code = "\n"
end = start + 1
while not codes[end].replace(" ", "").startswith("}"):
end += 1
if name == "":
name = codes[end].replace(" ", "").removeprefix("}").removesuffix(";")
for j in range(start + 1, end):
if codes[j].strip() == "":
continue
code = codes[j].strip().split(" ")
n = code[1].removesuffix(";").removeprefix("*")
t = code[0].removesuffix("_ty")
if t == "int":
if name is None:
parse_code += f"re->{n} = val->{n};\n"
else:
parse_code += f"re->v.{name}.{n} = val->v.{name}.{n};\n"
else:
# print(name, t, n)
if name is None:
parse_code += f"re->{n} = {t}_copy(val->{n}, arena);\n"
else:
parse_code += f"re->v.{name}.{n} = {t}_copy(val->v.{name}.{n}, arena);\n"
parse_code = parse_code.replace("\n", "\n" + " " * 3)
if name is not None:
parse_code = f"case {name}_kind:{parse_code}break;"
return (end, parse_code)


def gen_targets(codes: list[str], target: str):
i = 0
while i < len(codes) and codes[i].replace(" ", "") != "struct_%s{" % target:
i += 1
codes = codes[i + 1:]
i = 0
while i < len(codes) and codes[i].replace(" ", "") != "}v;":
i += 1
body_codes = codes[:i]
ii = i
while ii < len(codes) and codes[ii].replace(" ", "") != "};":
ii += 1
other_codes = codes[i:ii + 1]

template = UNION_TARGET_TEMPLATE % (target, target, target, target, target)
body = ""

i = 0
base = 0
while True:
while i < len(body_codes) and body_codes[i].replace(" ", "") != "struct{":
i += 1
if i == len(body_codes):
break
i, code = parse_struct(i + base, body_codes)
body += code + "\n"
if target == "stmt":
body += UNION_STMT_PATCH
body = body.replace("\n", "\n" + " " * 6)

if len(other_codes) > 1:
_, code = parse_struct(0, other_codes, None)
template = template.replace("OTHER", code)

return template.replace("BODY", " " * 3 + body)


def gen_individual_targets(codes: list[str], target: str):
i = 0
while i < len(codes) and codes[i].replace(" ", "") != "struct_%s{" % target:
i += 1
codes = codes[i:]
i = 0
while i < len(codes) and codes[i] != "};":
i += 1
codes = codes[:i + 1]
template = INDIVIDUAL_TARGET_TEMPLATE % (
target, target, target, target, target)
_, body = parse_struct(0, codes, None)
return template.replace("BODY", " " * 3 + body)


FILE_CONTENT = "#include \"deepcopy_gen.h\"\n"
HEADER_CONTENT = "#ifndef DEEPCOPY_GEN_H\n#define DEEPCOPY_GEN_H\n\n#include \"deepcopy.h\"\n\n"

for t in TARGETS:
FILE_CONTENT += gen_targets(codes_raw, t) + "\n"
HEADER_CONTENT += f"{t}_ty {t}_copy({t}_ty val, PyArena *arena);\n"
for t in INDIVIDUAL_TARGETS:
FILE_CONTENT += gen_individual_targets(codes_raw, t) + "\n"
HEADER_CONTENT += f"{t}_ty {t}_copy({t}_ty val, PyArena *arena);\n"

with open(sys.argv[2] + ".c", "w", encoding="utf-8") as f:
f.write(FILE_CONTENT)
with open(sys.argv[2] + ".h", "w", encoding="utf-8") as f:
f.write(HEADER_CONTENT + "\n#endif")

and some manually written implementations

deepcopy.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#ifndef DEEPCOPY_H
#define DEEPCOPY_H

#define Py_BUILD_CORE 1
#include "Python.h"
#include "pycore_ast.h"

#include "deepcopy_gen.h"

expr_ty expr_copy(expr_ty val, PyArena *arena);
stmt_ty stmt_copy(stmt_ty val, PyArena *arena);

#define ASDL_SEQ_COPY_HEADER(type) \
type *type##_copy(type *seq, PyArena *arena); \
type *type##_copy_add(type *seq, PyArena *arena, int add_size); \
type *type##_copy_add_front(type *seq, PyArena *arena, int add_size);

ASDL_SEQ_COPY_HEADER(asdl_expr_seq)
ASDL_SEQ_COPY_HEADER(asdl_stmt_seq)
ASDL_SEQ_COPY_HEADER(asdl_keyword_seq)
ASDL_SEQ_COPY_HEADER(asdl_int_seq)
ASDL_SEQ_COPY_HEADER(asdl_arg_seq)
ASDL_SEQ_COPY_HEADER(asdl_type_param_seq)
ASDL_SEQ_COPY_HEADER(asdl_comprehension_seq)
ASDL_SEQ_COPY_HEADER(asdl_alias_seq)
ASDL_SEQ_COPY_HEADER(asdl_excepthandler_seq)
ASDL_SEQ_COPY_HEADER(asdl_type_ignore_seq)
ASDL_SEQ_COPY_HEADER(asdl_withitem_seq)
ASDL_SEQ_COPY_HEADER(asdl_match_case_seq)
ASDL_SEQ_COPY_HEADER(asdl_identifier_seq)
ASDL_SEQ_COPY_HEADER(asdl_pattern_seq)

#define TRIAL_COPY_HEDER(type) type##_ty type##_copy(type##_ty val, PyArena *arena);
TRIAL_COPY_HEDER(boolop)
TRIAL_COPY_HEDER(operator)
TRIAL_COPY_HEDER(unaryop)
TRIAL_COPY_HEDER(cmpop)
TRIAL_COPY_HEDER(expr_context)
identifier identifier_copy(identifier val, PyArena *arena);
PyObject *constant_copy(PyObject *val, PyArena *arena);
PyObject *string_copy(PyObject *val, PyArena *arena);
// PyObject *object_copy(PyObject *val, PyArena *arena);

#endif // DEEPCOPY_H

deepcopy.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#include "deepcopy.h"

// implement log funcs yourself
#define INFO(fmt, ...) info(fmt, ##__VA_ARGS__)
#define ERROR(fmt, ...) error(fmt, ##__VA_ARGS__)
#define PANIC(fmt, ...) panic(fmt, ##__VA_ARGS__)

PyObject *PyUnicode_FromString_Arena(const char *s, PyArena *arena)
{
PyObject *re = (PyObject *)PyUnicode_FromString(s);
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}

PyObject *PyLong_Copy_Arena(PyObject *s, PyArena *arena)
{
if (s == NULL)
{
return NULL;
}
PyObject *re = (PyObject *)PyLong_FromLongLong(PyLong_AsLongLong(s));
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}

PyObject *PyUnicode_Copy_Arena(PyObject *s, PyArena *arena)
{
if (s == NULL)
{
return NULL;
}
Py_ssize_t length = PyUnicode_GET_LENGTH(s);
PyObject *re = (PyObject *)PyUnicode_New(length, PyUnicode_MAX_CHAR_VALUE(s));
memcpy(PyUnicode_DATA(re), PyUnicode_DATA(s), length * PyUnicode_KIND(s)); // from _PyUnicode_Copy but no checks
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}

PyObject *PyLong_FromLong_Arena(long n, PyArena *arena)
{
PyObject *re = (PyObject *)PyLong_FromLong(n);
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}

#define TRIAL_COPY(type) \
type##_ty type##_copy(type##_ty val, PyArena *arena) \
{ \
return val; \
}
TRIAL_COPY(boolop)
TRIAL_COPY(operator)
TRIAL_COPY(unaryop)
TRIAL_COPY(cmpop)
TRIAL_COPY(expr_context)

identifier identifier_copy(identifier val, PyArena *arena)
{
return PyUnicode_Copy_Arena(val, arena);
}
PyObject *string_copy(PyObject *val, PyArena *arena)
{
return PyUnicode_Copy_Arena(val, arena);
}
PyObject *constant_copy(PyObject *val, PyArena *arena)
{
if (PyLong_CheckExact(val))
{
return PyLong_Copy_Arena(val, arena);
}
else if (PyUnicode_CheckExact(val))
{
return PyUnicode_Copy_Arena(val, arena);
}
ERROR("Unknown constant type, use fallback - %s\n", val->ob_type->tp_name);
_PyArena_AddPyObject(arena, val);
Py_INCREF(val);
return val;
}

#define ASDL_SEQ_COPY_COMBINED(type) \
ASDL_SEQ_COPY_ADD_FRONT(type) \
ASDL_SEQ_COPY_ADD(type) \
ASDL_SEQ_COPY(type)

#define ASDL_SEQ_COPY_ADD(type) \
asdl_##type##_seq *asdl_##type##_seq##_copy_add(asdl_##type##_seq *seq, PyArena *arena, int add_size) \
{ \
asdl_##type##_seq *re; \
if (seq == NULL) \
{ \
re = _Py_##asdl_##type##_seq##_new(add_size, arena); \
} \
else \
{ \
re = _Py_##asdl_##type##_seq##_new(seq->size + add_size, arena); \
for (int i = 0; i < seq->size; i++) \
{ \
re->typed_elements[i] = type##_copy(seq->typed_elements[i], arena); \
} \
} \
return re; \
}

#define ASDL_SEQ_COPY_ADD_FRONT(type) \
asdl_##type##_seq *asdl_##type##_seq##_copy_add_front(asdl_##type##_seq *seq, PyArena *arena, int add_size) \
{ \
asdl_##type##_seq *re; \
if (seq == NULL) \
{ \
re = _Py_##asdl_##type##_seq##_new(add_size, arena); \
} \
else \
{ \
re = _Py_##asdl_##type##_seq##_new(seq->size + add_size, arena); \
for (int i = 1; i < seq->size + 1; i++) \
{ \
re->typed_elements[i] = type##_copy(seq->typed_elements[i - 1], arena); \
} \
} \
return re; \
}

#define ASDL_SEQ_COPY(type) \
asdl_##type##_seq *asdl_##type##_seq##_copy(asdl_##type##_seq *seq, PyArena *arena) \
{ \
if (seq == NULL) \
{ \
return NULL; \
} \
asdl_##type##_seq *re = _Py_##asdl_##type##_seq##_new(seq->size, arena); \
for (int i = 0; i < seq->size; i++) \
{ \
re->typed_elements[i] = type##_copy(seq->typed_elements[i], arena); \
} \
return re; \
}

ASDL_SEQ_COPY_COMBINED(expr)
ASDL_SEQ_COPY_COMBINED(stmt)
ASDL_SEQ_COPY_COMBINED(keyword)
asdl_int_seq *asdl_int_seq_copy(asdl_int_seq *seq, PyArena *arena)
{
asdl_int_seq *re = _Py_asdl_int_seq_new(seq->size, arena);
for (int i = 0; i < seq->size; i++)
{
re->typed_elements[i] = seq->typed_elements[i];
}
return re;
}
asdl_int_seq *asdl_int_seq_copy_add(asdl_int_seq *seq, PyArena *arena, int add_size)
{
asdl_int_seq *re = _Py_asdl_int_seq_new(seq->size + add_size, arena);
for (int i = 0; i < seq->size; i++)
{
re->typed_elements[i] = seq->typed_elements[i];
}
return re;
}
ASDL_SEQ_COPY_COMBINED(arg)
ASDL_SEQ_COPY_COMBINED(comprehension)
ASDL_SEQ_COPY_COMBINED(type_param)
ASDL_SEQ_COPY_COMBINED(alias)
ASDL_SEQ_COPY_COMBINED(excepthandler)
ASDL_SEQ_COPY_COMBINED(type_ignore)
ASDL_SEQ_COPY_COMBINED(withitem)
ASDL_SEQ_COPY_COMBINED(match_case)
ASDL_SEQ_COPY_COMBINED(identifier)
ASDL_SEQ_COPY_COMBINED(pattern)

How to use

Run the script with pycore_ast.h as first argument, and the destination filename (w/o extension) as second argument.
E.g:

1
python deepcopy_ast.py $(readlink -f $CPYTHON_BIN_PATH/include/python3.*/internal/pycore_ast.h) $SRC_PATH/codgen/deepcopy_gen

Then add all above files into your project: deepcopy_gen.c and deepcopy.c.
Check CPython-AST-Fuzzer

Prev
2024-07-26 05:16:43