CPython internal AST structure deep-copy functions
/ 10 min read
Table of Contents
Deep copy
code generation for deepcopy Python AST tree
#### This script is used to generate deepcopy functions for CPython internal AST nodes.# License: MIT# Author: Eritque arcus# Date: July 20, 2024###
import sys
if len(sys.argv) != 3: print("Usage: python deepcopy_ast.py <header> <output>") sys.exit(1)
TARGETS = ["expr", "stmt", "excepthandler", "pattern", "type_ignore", "type_param", "mod"]INDIVIDUAL_TARGETS = ["comprehension", "arguments", "arg", "keyword", "alias", "withitem", "match_case"]
UNION_TARGET_TEMPLATE = """%s_ty %s_copy(%s_ty val, PyArena *arena){ if(val == NULL){ return NULL; } %s_ty re = _PyArena_Malloc(arena, sizeof(struct _%s)); re->kind = val->kind; switch (val->kind) { BODY } OTHER return re;}"""
UNION_STMT_PATCH = """case Pass_kind:case Break_kind:case Continue_kind: break;""".strip()
INDIVIDUAL_TARGET_TEMPLATE = """%s_ty %s_copy(%s_ty val, PyArena *arena){ if(val == NULL){ return NULL; } %s_ty re = _PyArena_Malloc(arena, sizeof(struct _%s)); BODY return re;}"""
header = sys.argv[1]with open(header, "r", encoding="utf-8") as f: codes_raw = f.read().strip().split("\n")
def parse_struct(start: int, codes: list[str], name: str = ""): parse_code = "\n" end = start + 1 while not codes[end].replace(" ", "").startswith("}"): end += 1 if name == "": name = codes[end].replace(" ", "").removeprefix("}").removesuffix(";") for j in range(start + 1, end): if codes[j].strip() == "": continue code = codes[j].strip().split(" ") n = code[1].removesuffix(";").removeprefix("*") t = code[0].removesuffix("_ty") if t == "int": if name is None: parse_code += f"re->{n} = val->{n};\n" else: parse_code += f"re->v.{name}.{n} = val->v.{name}.{n};\n" else: # print(name, t, n) if name is None: parse_code += f"re->{n} = {t}_copy(val->{n}, arena);\n" else: parse_code += f"re->v.{name}.{n} = {t}_copy(val->v.{name}.{n}, arena);\n" parse_code = parse_code.replace("\n", "\n" + " " * 3) if name is not None: parse_code = f"case {name}_kind:{parse_code}break;" return (end, parse_code)
def gen_targets(codes: list[str], target: str): i = 0 while i < len(codes) and codes[i].replace(" ", "") != "struct_%s{" % target: i += 1 codes = codes[i + 1:] i = 0 while i < len(codes) and codes[i].replace(" ", "") != "}v;": i += 1 body_codes = codes[:i] ii = i while ii < len(codes) and codes[ii].replace(" ", "") != "};": ii += 1 other_codes = codes[i:ii + 1]
template = UNION_TARGET_TEMPLATE % (target, target, target, target, target) body = ""
i = 0 base = 0 while True: while i < len(body_codes) and body_codes[i].replace(" ", "") != "struct{": i += 1 if i == len(body_codes): break i, code = parse_struct(i + base, body_codes) body += code + "\n" if target == "stmt": body += UNION_STMT_PATCH body = body.replace("\n", "\n" + " " * 6)
if len(other_codes) > 1: _, code = parse_struct(0, other_codes, None) template = template.replace("OTHER", code)
return template.replace("BODY", " " * 3 + body)
def gen_individual_targets(codes: list[str], target: str): i = 0 while i < len(codes) and codes[i].replace(" ", "") != "struct_%s{" % target: i += 1 codes = codes[i:] i = 0 while i < len(codes) and codes[i] != "};": i += 1 codes = codes[:i + 1] template = INDIVIDUAL_TARGET_TEMPLATE % ( target, target, target, target, target) _, body = parse_struct(0, codes, None) return template.replace("BODY", " " * 3 + body)
FILE_CONTENT = "#include \"deepcopy_gen.h\"\n"HEADER_CONTENT = "#ifndef DEEPCOPY_GEN_H\n#define DEEPCOPY_GEN_H\n\n#include \"deepcopy.h\"\n\n"
for t in TARGETS: FILE_CONTENT += gen_targets(codes_raw, t) + "\n" HEADER_CONTENT += f"{t}_ty {t}_copy({t}_ty val, PyArena *arena);\n"for t in INDIVIDUAL_TARGETS: FILE_CONTENT += gen_individual_targets(codes_raw, t) + "\n" HEADER_CONTENT += f"{t}_ty {t}_copy({t}_ty val, PyArena *arena);\n"
with open(sys.argv[2] + ".c", "w", encoding="utf-8") as f: f.write(FILE_CONTENT)with open(sys.argv[2] + ".h", "w", encoding="utf-8") as f: f.write(HEADER_CONTENT + "\n#endif")
and some manually written implementations
#ifndef DEEPCOPY_H#define DEEPCOPY_H
#define Py_BUILD_CORE 1#include "Python.h"#include "pycore_ast.h"
#include "deepcopy_gen.h"
expr_ty expr_copy(expr_ty val, PyArena *arena);stmt_ty stmt_copy(stmt_ty val, PyArena *arena);
#define ASDL_SEQ_COPY_HEADER(type) \ type *type##_copy(type *seq, PyArena *arena); \ type *type##_copy_add(type *seq, PyArena *arena, int add_size); \ type *type##_copy_add_front(type *seq, PyArena *arena, int add_size);
ASDL_SEQ_COPY_HEADER(asdl_expr_seq)ASDL_SEQ_COPY_HEADER(asdl_stmt_seq)ASDL_SEQ_COPY_HEADER(asdl_keyword_seq)ASDL_SEQ_COPY_HEADER(asdl_int_seq)ASDL_SEQ_COPY_HEADER(asdl_arg_seq)ASDL_SEQ_COPY_HEADER(asdl_type_param_seq)ASDL_SEQ_COPY_HEADER(asdl_comprehension_seq)ASDL_SEQ_COPY_HEADER(asdl_alias_seq)ASDL_SEQ_COPY_HEADER(asdl_excepthandler_seq)ASDL_SEQ_COPY_HEADER(asdl_type_ignore_seq)ASDL_SEQ_COPY_HEADER(asdl_withitem_seq)ASDL_SEQ_COPY_HEADER(asdl_match_case_seq)ASDL_SEQ_COPY_HEADER(asdl_identifier_seq)ASDL_SEQ_COPY_HEADER(asdl_pattern_seq)
#define TRIAL_COPY_HEDER(type) type##_ty type##_copy(type##_ty val, PyArena *arena);TRIAL_COPY_HEDER(boolop)TRIAL_COPY_HEDER(operator)TRIAL_COPY_HEDER(unaryop)TRIAL_COPY_HEDER(cmpop)TRIAL_COPY_HEDER(expr_context)identifier identifier_copy(identifier val, PyArena *arena);PyObject *constant_copy(PyObject *val, PyArena *arena);PyObject *string_copy(PyObject *val, PyArena *arena);// PyObject *object_copy(PyObject *val, PyArena *arena);
#endif // DEEPCOPY_H
#include "deepcopy.h"
// implement log funcs yourself#define INFO(fmt, ...) info(fmt, ##__VA_ARGS__)#define ERROR(fmt, ...) error(fmt, ##__VA_ARGS__)#define PANIC(fmt, ...) panic(fmt, ##__VA_ARGS__)
PyObject *PyUnicode_FromString_Arena(const char *s, PyArena *arena){ PyObject *re = (PyObject *)PyUnicode_FromString(s); _PyArena_AddPyObject(arena, re); // check obj2ast_constant from CPython source code Py_INCREF(re); return re;}
PyObject *PyLong_Copy_Arena(PyObject *s, PyArena *arena){ if (s == NULL) { return NULL; } PyObject *re = (PyObject *)PyLong_FromLongLong(PyLong_AsLongLong(s)); _PyArena_AddPyObject(arena, re); // check obj2ast_constant from CPython source code Py_INCREF(re); return re;}
PyObject *PyUnicode_Copy_Arena(PyObject *s, PyArena *arena){ if (s == NULL) { return NULL; } Py_ssize_t length = PyUnicode_GET_LENGTH(s); PyObject *re = (PyObject *)PyUnicode_New(length, PyUnicode_MAX_CHAR_VALUE(s)); memcpy(PyUnicode_DATA(re), PyUnicode_DATA(s), length * PyUnicode_KIND(s)); // from _PyUnicode_Copy but no checks _PyArena_AddPyObject(arena, re); // check obj2ast_constant from CPython source code Py_INCREF(re); return re;}
PyObject *PyLong_FromLong_Arena(long n, PyArena *arena){ PyObject *re = (PyObject *)PyLong_FromLong(n); _PyArena_AddPyObject(arena, re); // check obj2ast_constant from CPython source code Py_INCREF(re); return re;}
#define TRIAL_COPY(type) \ type##_ty type##_copy(type##_ty val, PyArena *arena) \ { \ return val; \ }TRIAL_COPY(boolop)TRIAL_COPY(operator)TRIAL_COPY(unaryop)TRIAL_COPY(cmpop)TRIAL_COPY(expr_context)
identifier identifier_copy(identifier val, PyArena *arena){ return PyUnicode_Copy_Arena(val, arena);}PyObject *string_copy(PyObject *val, PyArena *arena){ return PyUnicode_Copy_Arena(val, arena);}PyObject *constant_copy(PyObject *val, PyArena *arena){ if (PyLong_CheckExact(val)) { return PyLong_Copy_Arena(val, arena); } else if (PyUnicode_CheckExact(val)) { return PyUnicode_Copy_Arena(val, arena); } ERROR("Unknown constant type, use fallback - %s\n", val->ob_type->tp_name); _PyArena_AddPyObject(arena, val); Py_INCREF(val); return val;}
#define ASDL_SEQ_COPY_COMBINED(type) \ ASDL_SEQ_COPY_ADD_FRONT(type) \ ASDL_SEQ_COPY_ADD(type) \ ASDL_SEQ_COPY(type)
#define ASDL_SEQ_COPY_ADD(type) \ asdl_##type##_seq *asdl_##type##_seq##_copy_add(asdl_##type##_seq *seq, PyArena *arena, int add_size) \ { \ asdl_##type##_seq *re; \ if (seq == NULL) \ { \ re = _Py_##asdl_##type##_seq##_new(add_size, arena); \ } \ else \ { \ re = _Py_##asdl_##type##_seq##_new(seq->size + add_size, arena); \ for (int i = 0; i < seq->size; i++) \ { \ re->typed_elements[i] = type##_copy(seq->typed_elements[i], arena); \ } \ } \ return re; \ }
#define ASDL_SEQ_COPY_ADD_FRONT(type) \ asdl_##type##_seq *asdl_##type##_seq##_copy_add_front(asdl_##type##_seq *seq, PyArena *arena, int add_size) \ { \ asdl_##type##_seq *re; \ if (seq == NULL) \ { \ re = _Py_##asdl_##type##_seq##_new(add_size, arena); \ } \ else \ { \ re = _Py_##asdl_##type##_seq##_new(seq->size + add_size, arena); \ for (int i = 1; i < seq->size + 1; i++) \ { \ re->typed_elements[i] = type##_copy(seq->typed_elements[i - 1], arena); \ } \ } \ return re; \ }
#define ASDL_SEQ_COPY(type) \ asdl_##type##_seq *asdl_##type##_seq##_copy(asdl_##type##_seq *seq, PyArena *arena) \ { \ if (seq == NULL) \ { \ return NULL; \ } \ asdl_##type##_seq *re = _Py_##asdl_##type##_seq##_new(seq->size, arena); \ for (int i = 0; i < seq->size; i++) \ { \ re->typed_elements[i] = type##_copy(seq->typed_elements[i], arena); \ } \ return re; \ }
ASDL_SEQ_COPY_COMBINED(expr)ASDL_SEQ_COPY_COMBINED(stmt)ASDL_SEQ_COPY_COMBINED(keyword)asdl_int_seq *asdl_int_seq_copy(asdl_int_seq *seq, PyArena *arena){ asdl_int_seq *re = _Py_asdl_int_seq_new(seq->size, arena); for (int i = 0; i < seq->size; i++) { re->typed_elements[i] = seq->typed_elements[i]; } return re;}asdl_int_seq *asdl_int_seq_copy_add(asdl_int_seq *seq, PyArena *arena, int add_size){ asdl_int_seq *re = _Py_asdl_int_seq_new(seq->size + add_size, arena); for (int i = 0; i < seq->size; i++) { re->typed_elements[i] = seq->typed_elements[i]; } return re;}ASDL_SEQ_COPY_COMBINED(arg)ASDL_SEQ_COPY_COMBINED(comprehension)ASDL_SEQ_COPY_COMBINED(type_param)ASDL_SEQ_COPY_COMBINED(alias)ASDL_SEQ_COPY_COMBINED(excepthandler)ASDL_SEQ_COPY_COMBINED(type_ignore)ASDL_SEQ_COPY_COMBINED(withitem)ASDL_SEQ_COPY_COMBINED(match_case)ASDL_SEQ_COPY_COMBINED(identifier)ASDL_SEQ_COPY_COMBINED(pattern)
Default
code generation for default constructor for every types in Python AST tree
import pathlibfrom glob import globimport reimport sys
def extract_enum_kinds(source): enums = {} for match in re.finditer(r"enum (\w+_kind)\s*\{([^}]+)\};", source): enum_name = match.group(1) body = match.group(2) values = re.findall(r"\b(\w+_kind)\b", body) enums[enum_name] = values return enums
def extract_pyast_constructors(source): constructors = [] pattern = re.compile(r"PyAPI_FUNC\((\w+_ty)\)\s+(_PyAST_\w+)\s*\(([^)]*)\);") for match in pattern.finditer(source): return_type = match.group(1) name = match.group(2) args = [ a.strip() for a in match.group(3) .strip() .replace("\\\n", "") .replace("\n", " ") .split(",") ] constructors.append({"return_type": return_type, "name": name, "args": args}) return constructors
# TODO change meHEADER_PATH = "cpython_bin/include/python*/internal/pycore_ast.h"HEADER_PATH = glob(HEADER_PATH)[0]HEADER_PATH = pathlib.Path(HEADER_PATH).resolve()
FUNC_TEMPLATE = """%s default%s(ast_data_t *data){ return %s(%s);}"""
header_content = """#ifndef DEFAULT_GEN_H#define DEFAULT_GEN_H
#define Py_BUILD_CORE 1#include "Python.h"#include "pycore_ast.h"
typedef struct { PyArena *arena;} ast_data_t;
typedef enum Node_kind { %s} Node_kind_t;
extern const int node_kind_cnt;
extern const int table_size[];
extern const void(** implementation_tables[])(ast_data_t *);
#endif""".strip()source_content = """#include "default_gen.h"#include "default.h"
const int node_kind_cnt = %d;const int table_size[] = {%s};""".strip()
if len(sys.argv) != 2: print("Usage: python pycore_ast.py <output_file>") sys.exit(1)
print("parse from", HEADER_PATH)
with open(HEADER_PATH, "r") as f: source = f.read() enums = extract_enum_kinds(source)
enums.pop("_mod_kind")constructors = extract_pyast_constructors(source)# make a C style dict, enum -> function pointersimplementation_tables = {}for enum_name, _ in enums.items(): implementation_tables[enum_name] = [ constructor for constructor in constructors if enum_name.removeprefix("_").removesuffix("_kind") in constructor["return_type"] ]
source_content = source_content % ( len(enums), ( (", ".join([str(len(v)) for v in implementation_tables.values()])).removesuffix( "," ) ),)
header_content = header_content % ( "\n ".join([f"{name} = {i}," for i, name in enumerate(enums.keys())]),)for enum_name, functions in implementation_tables.items(): for func in functions: args = "" for a in func["args"]: # remove abundant spaces a = " ".join(a.strip().split()).split(" ") if a[1][0] == "*": if a[0] == "PyArena": args += "data->arena, " else: args += f"default_{a[0]}_ptr(data), " else: args += f"default_{a[0]}(data), " source_content += FUNC_TEMPLATE % ( func["return_type"], func["name"], func["name"], args.removesuffix(", "), ) source_content += """void(* %s_funcs[])(ast_data_t *) = { %s};""" % ( enum_name, "\n ".join( [f"(void *)default{name['name']}," for name in functions] ).removesuffix(","), )
source_content += """const void(** implementation_tables[])(ast_data_t *) = { %s};""" % ( "\n ".join([f"{name}_funcs," for name in enums.keys()]).removesuffix(","))
with open(sys.argv[1] + ".c", "w") as f: f.write(source_content)with open(sys.argv[1] + ".h", "w") as f: f.write(header_content)
and some manual implemented function
#ifndef AST_DEFAULT_H#define AST_DEFAULT_H
#define Py_BUILD_CORE 1#include "Python.h"#include "pycore_ast.h"
typedef struct { PyArena *arena;} ast_data_t;
identifier default_identifier(ast_data_t *data);arguments_ty default_arguments_ty(ast_data_t *data);stmt_ty default_stmt_ty(ast_data_t *data);string default_string(ast_data_t *data);expr_ty default_expr_ty(ast_data_t *data);int default_int(ast_data_t *data);constant default_constant(ast_data_t *data);expr_context_ty default_expr_context_ty(ast_data_t *data);alias_ty default_alias(ast_data_t *data);unaryop_ty default_unaryop(ast_data_t *data);operator_ty default_operator_ty(ast_data_t *data);
asdl_arguments_seq *default_asdl_arguments_seq_ptr(ast_data_t *data);asdl_identifier_seq *default_asdl_identifier_seq_ptr(ast_data_t *data);asdl_keyword_seq *default_asdl_keyword_seq_ptr(ast_data_t *data);asdl_stmt_seq *default_asdl_stmt_seq_ptr(ast_data_t *data);asdl_expr_seq *default_asdl_expr_seq_ptr(ast_data_t *data);asdl_type_param_seq *default_asdl_type_param_seq_ptr(ast_data_t *data);asdl_withitem_seq *default_asdl_withitem_seq_ptr(ast_data_t *data);asdl_match_case_seq *default_asdl_match_case_seq_ptr(ast_data_t *data);asdl_excepthandler_seq *default_asdl_excepthandler_seq_ptr(ast_data_t *data);asdl_alias_seq *default_asdl_alias_seq_ptr(ast_data_t *data);
#endif // AST_DEFAULT_H
#include "default.h"
#include "ast.h"
identifier default_identifier(ast_data_t *data) { return gen_name_id(data->gen_name_cnt++);}
arguments_ty default_arguments_ty(ast_data_t *data) { return _PyAST_arguments(NULL, _Py_asdl_arg_seq_new(0, data->arena), NULL, NULL, NULL, NULL, NULL, data->arena);}
stmt_ty default_stmt_ty(ast_data_t *data) { return _PyAST_Pass(LINE, data->arena);}
string default_string(ast_data_t *data) { return NULL; }
alias_ty default_alias(ast_data_t *data) { return _PyAST_alias(default_identifier(data), default_identifier(data), LINE, data->arena);}
expr_ty default_expr_ty(ast_data_t *data) { return _PyLong_GetZero(); }
int default_int(ast_data_t *data) { return 0; }
constant default_constant(ast_data_t *data) { return _PyLong_GetZero(); }
expr_context_ty default_expr_context_ty(ast_data_t *data) { return Load; }
operator_ty default_operator_ty(ast_data_t *data) { return Add; }
asdl_arguments_seq *default_asdl_arguments_seq_ptr(ast_data_t *data) { asdl_arguments_seq *tmp = _Py_asdl_arguments_seq_new(1, data->arena); tmp->typed_elements[0] = default_arguments_ty(data); return tmp;}
asdl_identifier_seq *default_asdl_identifier_seq_ptr(ast_data_t *data) { asdl_identifier_seq *tmp = _Py_asdl_identifier_seq_new(1, data->arena); tmp->typed_elements[0] = default_identifier(data); return tmp;}
asdl_keyword_seq *default_asdl_keyword_seq_ptr(ast_data_t *data) { return _Py_asdl_keyword_seq_new(0, data->arena);}
asdl_stmt_seq *default_asdl_stmt_seq_ptr(ast_data_t *data) { asdl_stmt_seq *tmp = _Py_asdl_stmt_seq_new(1, data->arena); tmp->typed_elements[0] = default_stmt_ty(data->arena); return tmp;}
asdl_expr_seq *default_asdl_expr_seq_ptr(ast_data_t *data) { asdl_expr_seq *tmp = _Py_asdl_expr_seq_new(1, data->arena); tmp->typed_elements[0] = default_expr_ty(data); return tmp;}
asdl_type_param_seq *default_asdl_type_param_seq_ptr(ast_data_t *data) { return _Py_asdl_type_param_seq_new(0, data->arena);}
asdl_withitem_seq *default_asdl_withitem_seq_ptr(ast_data_t *data) { asdl_withitem_seq *tmp = _Py_asdl_withitem_seq_new(1, data->arena); tmp->typed_elements[0] = _PyAST_Pass(LINE, data->arena); return tmp;}
asdl_match_case_seq *default_asdl_match_case_seq_ptr(ast_data_t *data) { asdl_match_case_seq *tmp = _Py_asdl_match_case_seq_new(1, data->arena); tmp->typed_elements[0] = _PyAST_Pass(LINE, data->arena); return tmp;}
asdl_excepthandler_seq *default_asdl_excepthandler_seq_ptr(ast_data_t *data) { asdl_excepthandler_seq *tmp = _Py_asdl_excepthandler_seq_new(1, data->arena); tmp->typed_elements[0] = _PyAST_Pass(LINE, data->arena); return tmp;}
asdl_alias_seq *default_asdl_alias_seq_ptr(ast_data_t *data) { asdl_alias_seq *tmp = _Py_asdl_alias_seq_new(1, data->arena); tmp->typed_elements[0] = default_alias(data); return tmp;}
unaryop_ty default_unaryop(ast_data_t *data) { return Invert; }
How to use
Run the script with pycore_ast.h
as first argument, and the destination filename (w/o extension) as second argument.
E.g:
python deepcopy_ast.py $(readlink -f $CPYTHON_BIN_PATH/include/python3.*/internal/pycore_ast.h) src/codgen/deepcopy_genpython default_gen.py src/codgen/default_gen
Then add all above files into your project.
Check CPython-AST-Fuzzer