skip to content

CPython internal AST structure deep-copy functions

/ 10 min read

Table of Contents

Deep copy

code generation for deepcopy Python AST tree

###
# This script is used to generate deepcopy functions for CPython internal AST nodes.
# License: MIT
# Author: Eritque arcus
# Date: July 20, 2024
###
import sys
if len(sys.argv) != 3:
print("Usage: python deepcopy_ast.py <header> <output>")
sys.exit(1)
TARGETS = ["expr", "stmt", "excepthandler",
"pattern", "type_ignore", "type_param", "mod"]
INDIVIDUAL_TARGETS = ["comprehension", "arguments",
"arg", "keyword", "alias", "withitem", "match_case"]
UNION_TARGET_TEMPLATE = """%s_ty %s_copy(%s_ty val, PyArena *arena)
{
if(val == NULL){
return NULL;
}
%s_ty re = _PyArena_Malloc(arena, sizeof(struct _%s));
re->kind = val->kind;
switch (val->kind) {
BODY
}
OTHER
return re;
}"""
UNION_STMT_PATCH = """
case Pass_kind:
case Break_kind:
case Continue_kind:
break;
""".strip()
INDIVIDUAL_TARGET_TEMPLATE = """%s_ty %s_copy(%s_ty val, PyArena *arena)
{
if(val == NULL){
return NULL;
}
%s_ty re = _PyArena_Malloc(arena, sizeof(struct _%s));
BODY
return re;
}"""
header = sys.argv[1]
with open(header, "r", encoding="utf-8") as f:
codes_raw = f.read().strip().split("\n")
def parse_struct(start: int, codes: list[str], name: str = ""):
parse_code = "\n"
end = start + 1
while not codes[end].replace(" ", "").startswith("}"):
end += 1
if name == "":
name = codes[end].replace(" ", "").removeprefix("}").removesuffix(";")
for j in range(start + 1, end):
if codes[j].strip() == "":
continue
code = codes[j].strip().split(" ")
n = code[1].removesuffix(";").removeprefix("*")
t = code[0].removesuffix("_ty")
if t == "int":
if name is None:
parse_code += f"re->{n} = val->{n};\n"
else:
parse_code += f"re->v.{name}.{n} = val->v.{name}.{n};\n"
else:
# print(name, t, n)
if name is None:
parse_code += f"re->{n} = {t}_copy(val->{n}, arena);\n"
else:
parse_code += f"re->v.{name}.{n} = {t}_copy(val->v.{name}.{n}, arena);\n"
parse_code = parse_code.replace("\n", "\n" + " " * 3)
if name is not None:
parse_code = f"case {name}_kind:{parse_code}break;"
return (end, parse_code)
def gen_targets(codes: list[str], target: str):
i = 0
while i < len(codes) and codes[i].replace(" ", "") != "struct_%s{" % target:
i += 1
codes = codes[i + 1:]
i = 0
while i < len(codes) and codes[i].replace(" ", "") != "}v;":
i += 1
body_codes = codes[:i]
ii = i
while ii < len(codes) and codes[ii].replace(" ", "") != "};":
ii += 1
other_codes = codes[i:ii + 1]
template = UNION_TARGET_TEMPLATE % (target, target, target, target, target)
body = ""
i = 0
base = 0
while True:
while i < len(body_codes) and body_codes[i].replace(" ", "") != "struct{":
i += 1
if i == len(body_codes):
break
i, code = parse_struct(i + base, body_codes)
body += code + "\n"
if target == "stmt":
body += UNION_STMT_PATCH
body = body.replace("\n", "\n" + " " * 6)
if len(other_codes) > 1:
_, code = parse_struct(0, other_codes, None)
template = template.replace("OTHER", code)
return template.replace("BODY", " " * 3 + body)
def gen_individual_targets(codes: list[str], target: str):
i = 0
while i < len(codes) and codes[i].replace(" ", "") != "struct_%s{" % target:
i += 1
codes = codes[i:]
i = 0
while i < len(codes) and codes[i] != "};":
i += 1
codes = codes[:i + 1]
template = INDIVIDUAL_TARGET_TEMPLATE % (
target, target, target, target, target)
_, body = parse_struct(0, codes, None)
return template.replace("BODY", " " * 3 + body)
FILE_CONTENT = "#include \"deepcopy_gen.h\"\n"
HEADER_CONTENT = "#ifndef DEEPCOPY_GEN_H\n#define DEEPCOPY_GEN_H\n\n#include \"deepcopy.h\"\n\n"
for t in TARGETS:
FILE_CONTENT += gen_targets(codes_raw, t) + "\n"
HEADER_CONTENT += f"{t}_ty {t}_copy({t}_ty val, PyArena *arena);\n"
for t in INDIVIDUAL_TARGETS:
FILE_CONTENT += gen_individual_targets(codes_raw, t) + "\n"
HEADER_CONTENT += f"{t}_ty {t}_copy({t}_ty val, PyArena *arena);\n"
with open(sys.argv[2] + ".c", "w", encoding="utf-8") as f:
f.write(FILE_CONTENT)
with open(sys.argv[2] + ".h", "w", encoding="utf-8") as f:
f.write(HEADER_CONTENT + "\n#endif")

and some manually written implementations

deepcopy.h
#ifndef DEEPCOPY_H
#define DEEPCOPY_H
#define Py_BUILD_CORE 1
#include "Python.h"
#include "pycore_ast.h"
#include "deepcopy_gen.h"
expr_ty expr_copy(expr_ty val, PyArena *arena);
stmt_ty stmt_copy(stmt_ty val, PyArena *arena);
#define ASDL_SEQ_COPY_HEADER(type) \
type *type##_copy(type *seq, PyArena *arena); \
type *type##_copy_add(type *seq, PyArena *arena, int add_size); \
type *type##_copy_add_front(type *seq, PyArena *arena, int add_size);
ASDL_SEQ_COPY_HEADER(asdl_expr_seq)
ASDL_SEQ_COPY_HEADER(asdl_stmt_seq)
ASDL_SEQ_COPY_HEADER(asdl_keyword_seq)
ASDL_SEQ_COPY_HEADER(asdl_int_seq)
ASDL_SEQ_COPY_HEADER(asdl_arg_seq)
ASDL_SEQ_COPY_HEADER(asdl_type_param_seq)
ASDL_SEQ_COPY_HEADER(asdl_comprehension_seq)
ASDL_SEQ_COPY_HEADER(asdl_alias_seq)
ASDL_SEQ_COPY_HEADER(asdl_excepthandler_seq)
ASDL_SEQ_COPY_HEADER(asdl_type_ignore_seq)
ASDL_SEQ_COPY_HEADER(asdl_withitem_seq)
ASDL_SEQ_COPY_HEADER(asdl_match_case_seq)
ASDL_SEQ_COPY_HEADER(asdl_identifier_seq)
ASDL_SEQ_COPY_HEADER(asdl_pattern_seq)
#define TRIAL_COPY_HEDER(type) type##_ty type##_copy(type##_ty val, PyArena *arena);
TRIAL_COPY_HEDER(boolop)
TRIAL_COPY_HEDER(operator)
TRIAL_COPY_HEDER(unaryop)
TRIAL_COPY_HEDER(cmpop)
TRIAL_COPY_HEDER(expr_context)
identifier identifier_copy(identifier val, PyArena *arena);
PyObject *constant_copy(PyObject *val, PyArena *arena);
PyObject *string_copy(PyObject *val, PyArena *arena);
// PyObject *object_copy(PyObject *val, PyArena *arena);
#endif // DEEPCOPY_H
deepcopy.c
#include "deepcopy.h"
// implement log funcs yourself
#define INFO(fmt, ...) info(fmt, ##__VA_ARGS__)
#define ERROR(fmt, ...) error(fmt, ##__VA_ARGS__)
#define PANIC(fmt, ...) panic(fmt, ##__VA_ARGS__)
PyObject *PyUnicode_FromString_Arena(const char *s, PyArena *arena)
{
PyObject *re = (PyObject *)PyUnicode_FromString(s);
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}
PyObject *PyLong_Copy_Arena(PyObject *s, PyArena *arena)
{
if (s == NULL)
{
return NULL;
}
PyObject *re = (PyObject *)PyLong_FromLongLong(PyLong_AsLongLong(s));
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}
PyObject *PyUnicode_Copy_Arena(PyObject *s, PyArena *arena)
{
if (s == NULL)
{
return NULL;
}
Py_ssize_t length = PyUnicode_GET_LENGTH(s);
PyObject *re = (PyObject *)PyUnicode_New(length, PyUnicode_MAX_CHAR_VALUE(s));
memcpy(PyUnicode_DATA(re), PyUnicode_DATA(s), length * PyUnicode_KIND(s)); // from _PyUnicode_Copy but no checks
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}
PyObject *PyLong_FromLong_Arena(long n, PyArena *arena)
{
PyObject *re = (PyObject *)PyLong_FromLong(n);
_PyArena_AddPyObject(arena, re);
// check obj2ast_constant from CPython source code
Py_INCREF(re);
return re;
}
#define TRIAL_COPY(type) \
type##_ty type##_copy(type##_ty val, PyArena *arena) \
{ \
return val; \
}
TRIAL_COPY(boolop)
TRIAL_COPY(operator)
TRIAL_COPY(unaryop)
TRIAL_COPY(cmpop)
TRIAL_COPY(expr_context)
identifier identifier_copy(identifier val, PyArena *arena)
{
return PyUnicode_Copy_Arena(val, arena);
}
PyObject *string_copy(PyObject *val, PyArena *arena)
{
return PyUnicode_Copy_Arena(val, arena);
}
PyObject *constant_copy(PyObject *val, PyArena *arena)
{
if (PyLong_CheckExact(val))
{
return PyLong_Copy_Arena(val, arena);
}
else if (PyUnicode_CheckExact(val))
{
return PyUnicode_Copy_Arena(val, arena);
}
ERROR("Unknown constant type, use fallback - %s\n", val->ob_type->tp_name);
_PyArena_AddPyObject(arena, val);
Py_INCREF(val);
return val;
}
#define ASDL_SEQ_COPY_COMBINED(type) \
ASDL_SEQ_COPY_ADD_FRONT(type) \
ASDL_SEQ_COPY_ADD(type) \
ASDL_SEQ_COPY(type)
#define ASDL_SEQ_COPY_ADD(type) \
asdl_##type##_seq *asdl_##type##_seq##_copy_add(asdl_##type##_seq *seq, PyArena *arena, int add_size) \
{ \
asdl_##type##_seq *re; \
if (seq == NULL) \
{ \
re = _Py_##asdl_##type##_seq##_new(add_size, arena); \
} \
else \
{ \
re = _Py_##asdl_##type##_seq##_new(seq->size + add_size, arena); \
for (int i = 0; i < seq->size; i++) \
{ \
re->typed_elements[i] = type##_copy(seq->typed_elements[i], arena); \
} \
} \
return re; \
}
#define ASDL_SEQ_COPY_ADD_FRONT(type) \
asdl_##type##_seq *asdl_##type##_seq##_copy_add_front(asdl_##type##_seq *seq, PyArena *arena, int add_size) \
{ \
asdl_##type##_seq *re; \
if (seq == NULL) \
{ \
re = _Py_##asdl_##type##_seq##_new(add_size, arena); \
} \
else \
{ \
re = _Py_##asdl_##type##_seq##_new(seq->size + add_size, arena); \
for (int i = 1; i < seq->size + 1; i++) \
{ \
re->typed_elements[i] = type##_copy(seq->typed_elements[i - 1], arena); \
} \
} \
return re; \
}
#define ASDL_SEQ_COPY(type) \
asdl_##type##_seq *asdl_##type##_seq##_copy(asdl_##type##_seq *seq, PyArena *arena) \
{ \
if (seq == NULL) \
{ \
return NULL; \
} \
asdl_##type##_seq *re = _Py_##asdl_##type##_seq##_new(seq->size, arena); \
for (int i = 0; i < seq->size; i++) \
{ \
re->typed_elements[i] = type##_copy(seq->typed_elements[i], arena); \
} \
return re; \
}
ASDL_SEQ_COPY_COMBINED(expr)
ASDL_SEQ_COPY_COMBINED(stmt)
ASDL_SEQ_COPY_COMBINED(keyword)
asdl_int_seq *asdl_int_seq_copy(asdl_int_seq *seq, PyArena *arena)
{
asdl_int_seq *re = _Py_asdl_int_seq_new(seq->size, arena);
for (int i = 0; i < seq->size; i++)
{
re->typed_elements[i] = seq->typed_elements[i];
}
return re;
}
asdl_int_seq *asdl_int_seq_copy_add(asdl_int_seq *seq, PyArena *arena, int add_size)
{
asdl_int_seq *re = _Py_asdl_int_seq_new(seq->size + add_size, arena);
for (int i = 0; i < seq->size; i++)
{
re->typed_elements[i] = seq->typed_elements[i];
}
return re;
}
ASDL_SEQ_COPY_COMBINED(arg)
ASDL_SEQ_COPY_COMBINED(comprehension)
ASDL_SEQ_COPY_COMBINED(type_param)
ASDL_SEQ_COPY_COMBINED(alias)
ASDL_SEQ_COPY_COMBINED(excepthandler)
ASDL_SEQ_COPY_COMBINED(type_ignore)
ASDL_SEQ_COPY_COMBINED(withitem)
ASDL_SEQ_COPY_COMBINED(match_case)
ASDL_SEQ_COPY_COMBINED(identifier)
ASDL_SEQ_COPY_COMBINED(pattern)

Default

code generation for default constructor for every types in Python AST tree

import pathlib
from glob import glob
import re
import sys
def extract_enum_kinds(source):
enums = {}
for match in re.finditer(r"enum (\w+_kind)\s*\{([^}]+)\};", source):
enum_name = match.group(1)
body = match.group(2)
values = re.findall(r"\b(\w+_kind)\b", body)
enums[enum_name] = values
return enums
def extract_pyast_constructors(source):
constructors = []
pattern = re.compile(r"PyAPI_FUNC\((\w+_ty)\)\s+(_PyAST_\w+)\s*\(([^)]*)\);")
for match in pattern.finditer(source):
return_type = match.group(1)
name = match.group(2)
args = [
a.strip()
for a in match.group(3)
.strip()
.replace("\\\n", "")
.replace("\n", " ")
.split(",")
]
constructors.append({"return_type": return_type, "name": name, "args": args})
return constructors
# TODO change me
HEADER_PATH = "cpython_bin/include/python*/internal/pycore_ast.h"
HEADER_PATH = glob(HEADER_PATH)[0]
HEADER_PATH = pathlib.Path(HEADER_PATH).resolve()
FUNC_TEMPLATE = """
%s default%s(ast_data_t *data){
return %s(%s);
}
"""
header_content = """
#ifndef DEFAULT_GEN_H
#define DEFAULT_GEN_H
#define Py_BUILD_CORE 1
#include "Python.h"
#include "pycore_ast.h"
typedef struct {
PyArena *arena;
} ast_data_t;
typedef enum Node_kind {
%s
} Node_kind_t;
extern const int node_kind_cnt;
extern const int table_size[];
extern const void(** implementation_tables[])(ast_data_t *);
#endif
""".strip()
source_content = """
#include "default_gen.h"
#include "default.h"
const int node_kind_cnt = %d;
const int table_size[] = {%s};
""".strip()
if len(sys.argv) != 2:
print("Usage: python pycore_ast.py <output_file>")
sys.exit(1)
print("parse from", HEADER_PATH)
with open(HEADER_PATH, "r") as f:
source = f.read()
enums = extract_enum_kinds(source)
enums.pop("_mod_kind")
constructors = extract_pyast_constructors(source)
# make a C style dict, enum -> function pointers
implementation_tables = {}
for enum_name, _ in enums.items():
implementation_tables[enum_name] = [
constructor
for constructor in constructors
if enum_name.removeprefix("_").removesuffix("_kind")
in constructor["return_type"]
]
source_content = source_content % (
len(enums),
(
(", ".join([str(len(v)) for v in implementation_tables.values()])).removesuffix(
","
)
),
)
header_content = header_content % (
"\n ".join([f"{name} = {i}," for i, name in enumerate(enums.keys())]),
)
for enum_name, functions in implementation_tables.items():
for func in functions:
args = ""
for a in func["args"]:
# remove abundant spaces
a = " ".join(a.strip().split()).split(" ")
if a[1][0] == "*":
if a[0] == "PyArena":
args += "data->arena, "
else:
args += f"default_{a[0]}_ptr(data), "
else:
args += f"default_{a[0]}(data), "
source_content += FUNC_TEMPLATE % (
func["return_type"],
func["name"],
func["name"],
args.removesuffix(", "),
)
source_content += """
void(* %s_funcs[])(ast_data_t *) = {
%s
};
""" % (
enum_name,
"\n ".join(
[f"(void *)default{name['name']}," for name in functions]
).removesuffix(","),
)
source_content += """
const void(** implementation_tables[])(ast_data_t *) = {
%s
};
""" % (
"\n ".join([f"{name}_funcs," for name in enums.keys()]).removesuffix(",")
)
with open(sys.argv[1] + ".c", "w") as f:
f.write(source_content)
with open(sys.argv[1] + ".h", "w") as f:
f.write(header_content)

and some manual implemented function

default.h
#ifndef AST_DEFAULT_H
#define AST_DEFAULT_H
#define Py_BUILD_CORE 1
#include "Python.h"
#include "pycore_ast.h"
typedef struct {
PyArena *arena;
} ast_data_t;
identifier default_identifier(ast_data_t *data);
arguments_ty default_arguments_ty(ast_data_t *data);
stmt_ty default_stmt_ty(ast_data_t *data);
string default_string(ast_data_t *data);
expr_ty default_expr_ty(ast_data_t *data);
int default_int(ast_data_t *data);
constant default_constant(ast_data_t *data);
expr_context_ty default_expr_context_ty(ast_data_t *data);
alias_ty default_alias(ast_data_t *data);
unaryop_ty default_unaryop(ast_data_t *data);
operator_ty default_operator_ty(ast_data_t *data);
asdl_arguments_seq *default_asdl_arguments_seq_ptr(ast_data_t *data);
asdl_identifier_seq *default_asdl_identifier_seq_ptr(ast_data_t *data);
asdl_keyword_seq *default_asdl_keyword_seq_ptr(ast_data_t *data);
asdl_stmt_seq *default_asdl_stmt_seq_ptr(ast_data_t *data);
asdl_expr_seq *default_asdl_expr_seq_ptr(ast_data_t *data);
asdl_type_param_seq *default_asdl_type_param_seq_ptr(ast_data_t *data);
asdl_withitem_seq *default_asdl_withitem_seq_ptr(ast_data_t *data);
asdl_match_case_seq *default_asdl_match_case_seq_ptr(ast_data_t *data);
asdl_excepthandler_seq *default_asdl_excepthandler_seq_ptr(ast_data_t *data);
asdl_alias_seq *default_asdl_alias_seq_ptr(ast_data_t *data);
#endif // AST_DEFAULT_H
default.c
#include "default.h"
#include "ast.h"
identifier default_identifier(ast_data_t *data) {
return gen_name_id(data->gen_name_cnt++);
}
arguments_ty default_arguments_ty(ast_data_t *data) {
return _PyAST_arguments(NULL, _Py_asdl_arg_seq_new(0, data->arena), NULL,
NULL, NULL, NULL, NULL, data->arena);
}
stmt_ty default_stmt_ty(ast_data_t *data) {
return _PyAST_Pass(LINE, data->arena);
}
string default_string(ast_data_t *data) { return NULL; }
alias_ty default_alias(ast_data_t *data) {
return _PyAST_alias(default_identifier(data), default_identifier(data),
LINE, data->arena);
}
expr_ty default_expr_ty(ast_data_t *data) { return _PyLong_GetZero(); }
int default_int(ast_data_t *data) { return 0; }
constant default_constant(ast_data_t *data) { return _PyLong_GetZero(); }
expr_context_ty default_expr_context_ty(ast_data_t *data) { return Load; }
operator_ty default_operator_ty(ast_data_t *data) { return Add; }
asdl_arguments_seq *default_asdl_arguments_seq_ptr(ast_data_t *data) {
asdl_arguments_seq *tmp = _Py_asdl_arguments_seq_new(1, data->arena);
tmp->typed_elements[0] = default_arguments_ty(data);
return tmp;
}
asdl_identifier_seq *default_asdl_identifier_seq_ptr(ast_data_t *data) {
asdl_identifier_seq *tmp = _Py_asdl_identifier_seq_new(1, data->arena);
tmp->typed_elements[0] = default_identifier(data);
return tmp;
}
asdl_keyword_seq *default_asdl_keyword_seq_ptr(ast_data_t *data) {
return _Py_asdl_keyword_seq_new(0, data->arena);
}
asdl_stmt_seq *default_asdl_stmt_seq_ptr(ast_data_t *data) {
asdl_stmt_seq *tmp = _Py_asdl_stmt_seq_new(1, data->arena);
tmp->typed_elements[0] = default_stmt_ty(data->arena);
return tmp;
}
asdl_expr_seq *default_asdl_expr_seq_ptr(ast_data_t *data) {
asdl_expr_seq *tmp = _Py_asdl_expr_seq_new(1, data->arena);
tmp->typed_elements[0] = default_expr_ty(data);
return tmp;
}
asdl_type_param_seq *default_asdl_type_param_seq_ptr(ast_data_t *data) {
return _Py_asdl_type_param_seq_new(0, data->arena);
}
asdl_withitem_seq *default_asdl_withitem_seq_ptr(ast_data_t *data) {
asdl_withitem_seq *tmp = _Py_asdl_withitem_seq_new(1, data->arena);
tmp->typed_elements[0] = _PyAST_Pass(LINE, data->arena);
return tmp;
}
asdl_match_case_seq *default_asdl_match_case_seq_ptr(ast_data_t *data) {
asdl_match_case_seq *tmp = _Py_asdl_match_case_seq_new(1, data->arena);
tmp->typed_elements[0] = _PyAST_Pass(LINE, data->arena);
return tmp;
}
asdl_excepthandler_seq *default_asdl_excepthandler_seq_ptr(ast_data_t *data) {
asdl_excepthandler_seq *tmp =
_Py_asdl_excepthandler_seq_new(1, data->arena);
tmp->typed_elements[0] = _PyAST_Pass(LINE, data->arena);
return tmp;
}
asdl_alias_seq *default_asdl_alias_seq_ptr(ast_data_t *data) {
asdl_alias_seq *tmp = _Py_asdl_alias_seq_new(1, data->arena);
tmp->typed_elements[0] = default_alias(data);
return tmp;
}
unaryop_ty default_unaryop(ast_data_t *data) { return Invert; }

How to use

Run the script with pycore_ast.h as first argument, and the destination filename (w/o extension) as second argument.
E.g:

Terminal window
python deepcopy_ast.py $(readlink -f $CPYTHON_BIN_PATH/include/python3.*/internal/pycore_ast.h) src/codgen/deepcopy_gen
python default_gen.py src/codgen/default_gen

Then add all above files into your project.
Check CPython-AST-Fuzzer