shuffle around TypeSource and ProxyExpr

This commit is contained in:
Shiz 2021-06-25 19:18:37 +02:00
parent cea7e8733f
commit c386a5577f
3 changed files with 79 additions and 77 deletions

View File

@ -1,13 +1,7 @@
import os
import math
import operator
import functools
from typing import Any, Optional as O, Sequence, Mapping, Callable, Generic as G, TypeVar, Tuple
from .base import Type, Context, PathElement
from .io import Stream, Segment, Pos
from .meta import Wrapper
from . import to_type
from typing import Any, Optional as O, Sequence, Mapping, Callable, Generic as G, TypeVar
symbols = {
@ -257,57 +251,7 @@ class CompExpr(Expr[bool]):
def __repr__(self) -> str:
return f'({self.__left!r} {symbols[self.__op]} {self.__right!r})'
class TypeSource(G[T], Wrapper[T], BaseExpr[T]):
def __init__(self, child: Type[T], count: int) -> None:
super().__init__(child)
self.stack: list[Tuple[Context, Segment, Stream, Pos, T]] = []
self.pstack: list[T] = []
self.count = count
def parse(self, context: Context, stream: Stream) -> T:
pos = stream.tell()
value = super().parse(context, stream)
for _ in range(self.count):
self.stack.append((context, context.segment, stream, pos, value))
return value
def dump(self, context: Context, stream: Stream, value: T) -> None:
pos = stream.tell()
for _ in range(self.count):
self.stack.append((context, context.segment, stream, pos, value))
super().dump(context, stream, value)
def sizeof(self, context: Context, value: O[T]) -> None:
for _ in range(self.count):
self.pstack.append(value)
return super().sizeof(context, value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> None:
for _ in range(self.count):
self.pstack.append(value)
return super().offsetof(context, path, value)
def default(self, context: Context) -> T:
value = super().default(context)
for _ in range(self.count):
self.pstack.append(value)
return value
def _sx_get_(self, pop: bool = True) -> T:
_, _, _, _, value = self.stack.pop() if pop else self.stack[-1]
return value
def _sx_peek_(self, pop: bool = True) -> T:
value = self.pstack.pop() if pop else self.pstack[-1]
return value
def _sx_put_(self, value: T, pop: bool = True) -> None:
context, segment, stream, pos, _ = self.stack.pop() if pop else self.stack[-1]
with context.enter_segment(segment, stream, pos, os.SEEK_SET) as f:
context.dump(to_type(self.child), f, value)
class IndirectExpr(G[T], Expr[T]):
class ProxyExpr(G[T], Expr[T]):
def __init__(self, name: str) -> None:
self.__name = name
self.__source: O[BaseExpr[T]] = None

View File

@ -1,6 +1,9 @@
from typing import Optional as O, Generic as G, Sequence, TypeVar, Any
from typing import Optional as O, Generic as G, Sequence, Tuple, TypeVar, Any
import os
from .base import Type, Context, PathElement, Error
from .io import Stream
from .io import Stream, Segment, Pos
from .expr import BaseExpr
from . import to_type
@ -26,12 +29,12 @@ class Wrapper(G[T], Type[T]):
return context.default(to_type(self.child))
def __str__(self) -> str:
return str(self.child)
return str(to_type(self.child))
def __repr__(self) -> str:
return repr(self.child)
return repr(to_type(self.child))
class Generic(Type):
class Generic(G[T], Type[T], BaseExpr[T]):
__slots__ = ('name', 'stack')
def __init__(self, name: str) -> None:
@ -47,30 +50,36 @@ class Generic(Type):
def pop(self) -> None:
self.stack.pop()
def _sx_get_(self, pop: bool = False) -> T:
return self.stack[-1]
def _sx_peek_(self, pop: bool = False) -> T:
return self.stack[-1]
def _get_sx_type_(self, ident: Any) -> Type:
return to_type(self.stack[-1])
def parse(self, context: Context, stream: Stream) -> Any:
def parse(self, context: Context, stream: Stream) -> T:
if not self.stack:
raise Error(context, 'unresolved generic')
return context.parse(to_type(self.stack[-1]), stream)
def dump(self, context: Context, stream: Stream, value: O[Any]) -> None:
def dump(self, context: Context, stream: Stream, value: T) -> None:
if not self.stack:
raise Error(context, 'unresolved generic')
context.dump(to_type(self.stack[-1]), stream, value)
def sizeof(self, context: Context, value: O[Any]) -> O[int]:
def sizeof(self, context: Context, value: O[T]) -> O[int]:
if not self.stack:
return None
return context.sizeof(to_type(self.stack[-1]), value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[Any]) -> O[int]:
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]:
if not self.stack:
return None
return context.offsetof(to_type(self.stack[-1]), path, value)
def default(self, context: Context) -> Any:
def default(self, context: Context) -> T:
if not self.stack:
raise Error(context, 'unresolved generic')
return context.default(to_type(self.stack[-1]))
@ -85,3 +94,52 @@ class Generic(Type):
def __deepcopy__(self, memo: Any) -> Any:
return self
class TypeSource(G[T], Wrapper[T], BaseExpr[T]):
def __init__(self, child: Type[T], count: int) -> None:
super().__init__(child)
self.stack: list[Tuple[Context, Segment, Stream, Pos, T]] = []
self.pstack: list[T] = []
self.count = count
def parse(self, context: Context, stream: Stream) -> T:
pos = stream.tell()
value = super().parse(context, stream)
for _ in range(self.count):
self.stack.append((context, context.segment, stream, pos, value))
return value
def dump(self, context: Context, stream: Stream, value: T) -> None:
pos = stream.tell()
for _ in range(self.count):
self.stack.append((context, context.segment, stream, pos, value))
super().dump(context, stream, value)
def sizeof(self, context: Context, value: O[T]) -> None:
for _ in range(self.count):
self.pstack.append(value)
return super().sizeof(context, value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> None:
for _ in range(self.count):
self.pstack.append(value)
return super().offsetof(context, path, value)
def default(self, context: Context) -> T:
value = super().default(context)
for _ in range(self.count):
self.pstack.append(value)
return value
def _sx_get_(self, pop: bool = True) -> T:
_, _, _, _, value = self.stack.pop() if pop else self.stack[-1]
return value
def _sx_peek_(self, pop: bool = True) -> T:
value = self.pstack.pop() if pop else self.pstack[-1]
return value
def _sx_put_(self, value: T, pop: bool = True) -> None:
context, segment, stream, pos, _ = self.stack.pop() if pop else self.stack[-1]
with context.enter_segment(segment, stream, pos, os.SEEK_SET) as f:
context.dump(to_type(self.child), f, value)

View File

@ -13,18 +13,18 @@ from ..core import to_type
from ..core.base import Context, Type, PathElement
from ..core.io import Stream, Pos, add_sizes
from ..core.util import indent, format_value, get_annot_locations
from ..core.meta import Generic
from ..core.expr import IndirectExpr, TypeSource
from ..core.meta import Generic, TypeSource
from ..core.expr import ProxyExpr
T = TypeVar('T')
class StructProxy:
class ProxyStruct:
def __init__(self):
self._sx_fields_ = {}
def __getattr__(self, name: str) -> IndirectExpr:
e = IndirectExpr(name)
def __getattr__(self, name: str) -> ProxyExpr:
e = ProxyExpr(name)
try:
self._sx_fields_[name].append(e)
except KeyError:
@ -209,10 +209,10 @@ class Struct:
else:
v = kwargs.pop(k)
setattr(self, k, v)
for name, value in kwargs.items():
setattr(self, name, value)
if kwargs:
raise AttributeError(', '.join(kwargs))
def __init_subclass__(cls, *, inject: bool = True, generics: Sequence[Generic] = (), **kwargs: Any):
def __init_subclass__(cls, *, inject: bool = True, generics: Sequence[Generic] = (), **kwargs: Any) -> None:
super().__init_subclass__()
# Get all generics definition
@ -243,7 +243,7 @@ class Struct:
localns.update({g.name: g for g in generics})
# Evaluate annotations into fields
proxy = StructProxy()
proxy = ProxyStruct()
localns['self'] = proxy
fields = {}
for name, (fn, line, globalns, value) in annots.items():