| #!/usr/bin/python |
| """Generate type/module slot files |
| """ |
| |
| # See the input file (Python/slots.toml) for a description of its format. |
| |
| import io |
| import sys |
| import json |
| import tomllib |
| import argparse |
| import functools |
| import contextlib |
| import collections |
| from pathlib import Path |
| |
| GENERATED_BY = 'Generated by Tools/build/generate_slots.py' |
| |
| REPO_ROOT = Path(__file__).parent.parent.parent |
| DEFAULT_INPUT_PATH = REPO_ROOT / 'Python/slots.toml' |
| INCLUDE_PATH = REPO_ROOT / 'Include' |
| DEFAULT_PUBLIC_HEADER_PATH = INCLUDE_PATH / 'slots_generated.h' |
| DEFAULT_PRIVATE_HEADER_PATH = INCLUDE_PATH / 'internal/pycore_slots_generated.h' |
| DEFAULT_C_PATH = REPO_ROOT / 'Python/slots_generated.c' |
| |
| TABLES = { |
| 'tp': 'ht_type', |
| 'am': 'as_async', |
| 'nb': 'as_number', |
| 'mp': 'as_mapping', |
| 'sq': 'as_sequence', |
| 'bf': 'as_buffer', |
| } |
| |
| |
| class SlotInfo: |
| def __init__(self, id, data): |
| self.id = id |
| self.kind = data['kind'] |
| self._data = data |
| try: |
| self.name = data['name'] |
| except KeyError: |
| self.name = '/'.join(data["equivalents"].values()) |
| else: |
| assert self.name.isidentifier |
| |
| @functools.cached_property |
| def equivalents(self): |
| return self._data['equivalents'] |
| |
| @functools.cached_property |
| def dtype(self): |
| try: |
| return self._data['dtype'] |
| except KeyError: |
| if self.is_type_field: |
| return 'func' |
| raise |
| |
| @functools.cached_property |
| def functype(self): |
| return self._data['functype'] |
| |
| @functools.cached_property |
| def is_type_field(self): |
| return self._data.get('is_type_field') |
| |
| @functools.cached_property |
| def type_field(self): |
| assert self.is_type_field |
| return self._data.get('field', self.name.removeprefix('Py_')) |
| |
| @functools.cached_property |
| def type_table_ident(self): |
| assert self.is_type_field |
| return self._data.get('table', self.type_field[:2]) |
| |
| @functools.cached_property |
| def duplicate_handling(self): |
| return self._data.get('duplicates', 'reject') |
| |
| @functools.cached_property |
| def null_handling(self): |
| try: |
| return self._data['nulls'] |
| except KeyError: |
| if self.kind == 'compat': |
| return 'allow' |
| if self.dtype in {'ptr', 'func'}: |
| return 'reject' |
| return 'allow' |
| |
| @functools.cached_property |
| def must_be_static(self): |
| return self._data.get('must_be_static', False) |
| |
| |
| def parse_slots(file): |
| toml_contents = tomllib.load(file) |
| result = [None] * len(toml_contents) |
| for key, data in toml_contents.items(): |
| slot_id = int(key) |
| try: |
| if result[slot_id]: |
| raise ValueError(f'slot ID {slot_id} repeated') |
| result[slot_id] = SlotInfo(slot_id, data) |
| except Exception as e: |
| e.add_note(f'handling slot {slot_id}') |
| raise |
| return result |
| |
| |
| class CWriter: |
| """Simple helper for generating C code""" |
| |
| def __init__(self, file): |
| self.file = file |
| self.indent = '' |
| self(f'/* {GENERATED_BY} */') |
| self() |
| |
| def out(self, *args, **kwargs): |
| """print args to the file, with current indent at the start""" |
| print(self.indent, end='', file=self.file) |
| print(*args, file=self.file, **kwargs) |
| |
| __call__ = out |
| |
| @contextlib.contextmanager |
| def block(self, header=None, end=''): |
| """Context for a {}-enclosed block of C""" |
| if header is None: |
| self.out('{') |
| else: |
| self.out(header, '{') |
| old_indent = self.indent |
| self.indent += ' ' |
| yield |
| self.indent = old_indent |
| self.out('}' + end) |
| |
| |
| def write_public_header(f, slots): |
| out = CWriter(f) |
| out(f'#ifndef _PY_HAVE_SLOTS_GENERATED_H') |
| out(f'#define _PY_HAVE_SLOTS_GENERATED_H') |
| out() |
| out(f'#if !defined(Py_LIMITED_API) || Py_LIMITED_API+0 >= _Py_PACK_VERSION(3, 15)') |
| out(f'#define _Py_SLOT_COMPAT_VALUE(OLD, NEW) NEW') |
| out(f'#else') |
| out(f'#define _Py_SLOT_COMPAT_VALUE(OLD, NEW) OLD') |
| out(f'#endif') |
| out() |
| compat_ids = {} |
| for slot in slots: |
| if slot.kind == 'compat': |
| for new_name in slot.equivalents.values(): |
| compat_ids[new_name] = slot.id |
| for slot in slots: |
| if slot.kind == 'compat': |
| continue |
| slot_id = slot.id |
| if compat := compat_ids.get(slot.name): |
| slot_id = f'_Py_SLOT_COMPAT_VALUE({compat}, {slot_id})' |
| out(f'#define {slot.name} {slot_id}') |
| out() |
| out(f'#define _Py_slot_COUNT {len(slots)}') |
| out(f'#endif /* _PY_HAVE_SLOTS_GENERATED_H */') |
| |
| |
| def write_private_header(f, slots): |
| out = CWriter(f) |
| |
| def add_case(slot): |
| out(out(f' case {slot.id}:')) |
| |
| slots_by_name = {slot.name: slot for slot in slots} |
| |
| out(f'#ifndef _PY_HAVE_INTERNAL_SLOTS_GENERATED_H') |
| out(f'#define _PY_HAVE_INTERNAL_SLOTS_GENERATED_H') |
| for kind in 'type', 'mod': |
| out() |
| out(f'static inline uint16_t') |
| out(f'_PySlot_resolve_{kind}_slot(uint16_t slot_id)') |
| with out.block(): |
| with out.block('switch (slot_id)'): |
| good_slots = [] |
| for slot in slots: |
| if slot.kind == 'compat': |
| new_slot = slots_by_name[slot.equivalents[kind]] |
| out(f'case {slot.id}:') |
| out(f' return {new_slot.name};') |
| elif slot.kind in {kind, 'slot'}: |
| good_slots.append(f'case {slot.name}:') |
| for case in good_slots: |
| out(case) |
| out(f' return slot_id;') |
| out(f'default:') |
| out(f' return Py_slot_invalid;') |
| out() |
| out(f'static inline void*') |
| out(f'_PySlot_type_getslot(PyTypeObject *tp, uint16_t slot_id)') |
| with out.block(): |
| with out.block('switch (slot_id)'): |
| for slot in slots: |
| if slot.is_type_field: |
| field = slot.type_field |
| table_ident = slot.type_table_ident |
| if table_ident == 'tp': |
| out(f'case {slot.name}:') |
| out(f' return (void*)tp->{field};') |
| else: |
| if table_ident == 'ht': |
| cond = 'tp->tp_flags & Py_TPFLAGS_HEAPTYPE' |
| val = f'((PyHeapTypeObject*)tp)->{field}' |
| else: |
| table = TABLES[table_ident] |
| cond = f'tp->tp_{table}' |
| val = f'tp->tp_{table}->{field}' |
| out(f'case {slot.name}:') |
| out(f' if (!({cond})) return NULL;') |
| out(f' return (void*){val};') |
| out(f'_PySlot_err_bad_slot("PyType_GetSlot", slot_id);') |
| out(f'return NULL;') |
| out() |
| out(f'static inline void') |
| out(f'_PySlot_heaptype_apply_field_slot(PyHeapTypeObject *ht,', |
| f'PySlot slot)') |
| with out.block(): |
| with out.block('switch (slot.sl_id)'): |
| for slot in slots: |
| if slot.is_type_field: |
| field = slot.type_field |
| table_ident = slot.type_table_ident |
| if table_ident == 'ht': |
| continue |
| table = TABLES[table_ident] |
| if slot.dtype == 'func': |
| functype = f'({slot.functype})' |
| else: |
| functype = '' |
| out(f'case {slot.name}:') |
| out(f' ht->{table}.{field} = {functype}slot.sl_{slot.dtype};') |
| out(f' break;') |
| out() |
| out(f'static inline _PySlot_DTYPE') |
| out(f'_PySlot_get_dtype(uint16_t slot_id)') |
| with out.block(): |
| with out.block('switch (slot_id)'): |
| for slot in slots: |
| if slot.kind == 'compat': |
| continue |
| dtype = slot.dtype |
| name = slot.name |
| out(f'case {name}: return _PySlot_DTYPE_{dtype.upper()};') |
| out(f'default: return _PySlot_DTYPE_VOID;') |
| out() |
| out(f'static inline _PySlot_PROBLEM_HANDLING') |
| out(f'_PySlot_get_duplicate_handling(uint16_t slot_id)') |
| with out.block(): |
| with out.block('switch (slot_id)'): |
| results = collections.defaultdict(list) |
| for slot in slots: |
| if slot.kind == 'compat': |
| continue |
| handling = slot.duplicate_handling |
| results[handling.upper()].append(f'case {slot.name}:') |
| results.pop('REJECT') |
| for handling, cases in results.items(): |
| for case in cases: |
| out(case) |
| out(f' return _PySlot_PROBLEM_{handling};') |
| out(f'default:') |
| out(f' return _PySlot_PROBLEM_REJECT;') |
| out() |
| out(f'static inline _PySlot_PROBLEM_HANDLING') |
| out(f'_PySlot_get_null_handling(uint16_t slot_id)') |
| with out.block(): |
| with out.block('switch (slot_id)'): |
| results = collections.defaultdict(list) |
| for slot in slots: |
| if slot.kind == 'compat': |
| continue |
| handling = slot.null_handling |
| if handling is None: |
| if slot.kind != 'compat' and slot.dtype in {'ptr', 'func'}: |
| handling = 'reject' |
| else: |
| handling = 'allow' |
| results[handling.upper()].append(f'case {slot.name}:') |
| results.pop('REJECT') |
| for handling, cases in results.items(): |
| for case in cases: |
| out(case) |
| out(f' return _PySlot_PROBLEM_{handling};') |
| out(f'default:') |
| out(f' return _PySlot_PROBLEM_REJECT;') |
| out() |
| out(f'static inline bool') |
| out(f'_PySlot_get_must_be_static(uint16_t slot_id)') |
| with out.block(): |
| with out.block('switch (slot_id)'): |
| cases = [] |
| for slot in slots: |
| if slot.must_be_static: |
| out(f'case {slot.name}: return true;') |
| out(f'return false;') |
| out() |
| out(f'#endif /* _PY_HAVE_INTERNAL_SLOTS_GENERATED_H */') |
| |
| |
| def write_c(f, slots): |
| out = CWriter(f) |
| out('#include "Python.h"') |
| out('#include "pycore_slots.h" // _PySlot_names') |
| out() |
| with out.block(f'const char *const _PySlot_names[] =', end=';'): |
| for slot in slots: |
| out(f'"{slot.name}",') |
| out('NULL') |
| |
| |
| @contextlib.contextmanager |
| def replace_file(filename): |
| file_path = Path(filename) |
| with io.StringIO() as sio: |
| yield sio |
| try: |
| old_text = file_path.read_text() |
| except FileNotFoundError: |
| old_text = None |
| new_text = sio.getvalue() |
| if old_text == new_text: |
| print(f'{filename}: not modified', file=sys.stderr) |
| else: |
| print(f'{filename}: writing new content', file=sys.stderr) |
| file_path.write_text(new_text) |
| |
| |
| def main(argv): |
| if len(argv) == 1: |
| # No sens calling this with no arguments. |
| argv.append('--help') |
| |
| parser = argparse.ArgumentParser(prog=argv[0], description=__doc__) |
| parser.add_argument( |
| '-i', '--input', default=DEFAULT_INPUT_PATH, |
| help=f'the input file (default: {DEFAULT_INPUT_PATH})') |
| parser.add_argument( |
| '--generate-all', action=argparse.BooleanOptionalAction, |
| help='write all output files to their default locations') |
| parser.add_argument( |
| '-j', '--jsonl', action=argparse.BooleanOptionalAction, |
| help='write info to stdout in "JSON Lines" format (one JSON per line)') |
| outfile_group = parser.add_argument_group( |
| 'output files', |
| description='By default, no files are generated. Use --generate-all ' |
| + 'or the options below to generate them.') |
| outfile_group.add_argument( |
| '-H', '--public-header', |
| help='file into which to write the public header') |
| outfile_group.add_argument( |
| '-I', '--private-header', |
| help='file into which to write the private header') |
| outfile_group.add_argument( |
| '-C', '--cfile', |
| help='file into which to write internal C code') |
| args = parser.parse_args(argv[1:]) |
| |
| if args.generate_all: |
| if args.public_header is None: |
| args.public_header = DEFAULT_PUBLIC_HEADER_PATH |
| if args.private_header is None: |
| args.private_header = DEFAULT_PRIVATE_HEADER_PATH |
| if args.cfile is None: |
| args.cfile = DEFAULT_C_PATH |
| |
| with open(args.input, 'rb') as f: |
| slots = parse_slots(f) |
| |
| if args.jsonl: |
| for slot in slots: |
| print(json.dumps(slot.to_dict())) |
| |
| if args.public_header: |
| with replace_file(args.public_header) as f: |
| write_public_header(f, slots) |
| |
| if args.private_header: |
| with replace_file(args.private_header) as f: |
| write_private_header(f, slots) |
| |
| if args.cfile: |
| with replace_file(args.cfile) as f: |
| write_c(f, slots) |
| |
| if __name__ == "__main__": |
| main(sys.argv) |