Source code for TagScriptEngine.interpreter

from __future__ import annotations

import logging
from itertools import islice
from typing import Any, Dict, List, Optional, Protocol, Tuple, cast
from typing_extensions import TypeAlias

from .exceptions import (
    ProcessError,
    StopError,
    TagScriptError,
    WorkloadExceededError,
)
from .interface import Adapter, Block
from .utils import maybe_await
from .verb import Verb

__all__: Tuple[str, ...] = (
    "Interpreter",
    "AsyncInterpreter",
    "Context",
    "Response",
    "Node",
    "build_node_tree",
)

log: logging.Logger = logging.getLogger(__name__)

AdapterDict: TypeAlias = Dict[str, Adapter]
AnyDict: TypeAlias = Dict[str, Any]


class _Node(Protocol):
    def __init__(self, coordinates: Tuple[int, int], verb: Optional[Verb] = None) -> None: ...

    def __str__(self) -> str: ...

    def __repr__(self) -> str: ...


[docs] class Node(_Node): """ A low-level object representing a bracketed block. Attributes ---------- coordinates: Tuple[int, int] The start and end position of the bracketed text block. verb: Optional[Verb] The determined Verb for this node. output: The `Block` processed output for this node. """ __slots__: Tuple[str, ...] = ("output", "verb", "coordinates") def __init__(self, coordinates: Tuple[int, int], verb: Optional[Verb] = None) -> None: self.output: Optional[str] = None self.verb: Optional[Verb] = verb self.coordinates: Tuple[int, int] = coordinates def __str__(self) -> str: return str(self.verb) + " at " + str(self.coordinates) def __repr__(self) -> str: return f"<Node verb={self.verb!r} coordinates={self.coordinates!r} output={self.output!r}>"
[docs] def build_node_tree(message: str) -> List[Node]: """ Function that finds all possible nodes in a string. Returns ------- List[Node] A list of all possible text bracket blocks. """ nodes: List[Node] = [] previous: str = r"" starts: List[int] = [] for idx, char in enumerate(message): if char == "{" and previous != r"\\": starts.append(idx) if char == "}" and previous != r"\\": if not starts: continue coords: Tuple[int, int] = (starts.pop(), idx) node: Node = Node(coords) nodes.append(node) previous: str = char return nodes
class _Response(Protocol): def __init__( self, *, variables: Optional[AdapterDict] = None, extra_kwargs: Optional[AnyDict] = None, ) -> None: ... def __repr__(self) -> str: ...
[docs] class Response(_Response): """ An object containing information on a completed TagScript process. Attributes ---------- body: str The cleaned message with all verbs interpreted. actions: Dict[str, Any] A dictionary that blocks can access and modify to define post-processing actions. variables: Dict[str, Adapter] A dictionary of variables that blocks such as the `LooseVariableGetterBlock` can access. extra_kwargs: Dict[str, Any] A dictionary of extra keyword arguments that blocks can use to define their own behavior. """ __slots__: Tuple[str, ...] = ("body", "actions", "variables", "extra_kwargs") def __init__( self, *, variables: Optional[AdapterDict] = None, extra_kwargs: Optional[AnyDict] = None, ) -> None: self.body: Optional[str] = None self.actions: AnyDict = {} self.variables: AdapterDict = variables if variables is not None else {} self.extra_kwargs: AnyDict = extra_kwargs if extra_kwargs is not None else {} def __repr__(self) -> str: return ( f"<Response body={self.body!r} actions={self.actions!r} variables={self.variables!r}>" )
class _Context(Protocol): def __init__(self, verb: Verb, res: Response, interpreter: Interpreter, og: str) -> None: ... def __repr__(self) -> str: ...
[docs] class Context(_Context): """ An object containing data on the TagScript block processed by the interpreter. This class is passed to adapters and blocks during processing. Attributes ---------- verb: Verb The Verb object representing a TagScript block. original_message: str The original message passed to the interpreter. interpreter: Interpreter The interpreter processing the TagScript. """ __slots__: Tuple[str, ...] = ("verb", "original_message", "interpreter", "response") def __init__(self, verb: Verb, res: Response, interpreter: Interpreter, og: str) -> None: self.verb: Verb = verb self.original_message: str = og self.interpreter: Interpreter = interpreter self.response: Response = res def __repr__(self) -> str: return f"<Context verb={self.verb!r}>"
class _Interpreter(Protocol): def __init__(self, blocks: List[Block]) -> None: ... def __repr__(self) -> str: ... def _get_context( self, node: Node, final: str, *, response: Response, original_message: str, verb_limit: int, dot_parameter: bool, ) -> Context: ... def _get_acceptors(self, ctx: Context) -> List[Block]: ... def _process_blocks(self, ctx: Context, node: Node) -> Optional[str]: ... @staticmethod def _check_workload(charlimit: int, total_work: int, output: str) -> Optional[int]: ... @staticmethod def _text_deform(start: int, end: int, final: str, output: str) -> Tuple[str, int]: ... @staticmethod def _translate_nodes( node_ordered_list: List[Node], index: int, start: int, differential: int ) -> None: ... def _solve( self, message: str, node_ordered_list: List[Node], response: Response, *, charlimit: int, verb_limit: int = 6000, dot_parameter: bool, ) -> str: ... @staticmethod def _return_response(response: Response, output: str) -> Response: ... def process( self, message: str, seed_variables: Optional[AdapterDict] = None, *, charlimit: Optional[int] = None, dot_parameter: bool = False, **kwargs: Any, ) -> Response: ...
[docs] class Interpreter(_Interpreter): """ The TagScript interpreter. Attributes ---------- blocks: List[Block] A list of blocks to be used for TagScript processing. """ __slots__ = ("blocks",) def __init__(self, blocks: List[Block]) -> None: self.blocks: List[Block] = blocks def __repr__(self) -> str: return f"<{type(self).__name__} blocks={self.blocks!r}>" def _get_context( self, node: Node, final: str, *, response: Response, original_message: str, verb_limit: int, dot_parameter: bool, ) -> Context: # Get the updated verb string from coordinates and make the context start, end = node.coordinates node.verb = Verb(final[start : end + 1], limit=verb_limit, dot_parameter=dot_parameter) return Context(node.verb, response, self, original_message) def _get_acceptors(self, ctx: Context) -> List[Block]: acceptors = [b for b in self.blocks if b.will_accept(ctx)] log.debug("%r acceptors: %r", ctx, acceptors) return acceptors def _process_blocks(self, ctx: Context, node: Node) -> Optional[str]: acceptors = self._get_acceptors(ctx) for b in acceptors: value = b.process(ctx) if value is not None: # Value found? We're done here. value = str(value) node.output = value return value @staticmethod def _check_workload(charlimit: int, total_work: int, output: str) -> Optional[int]: if not charlimit: return total_work += len(output) if total_work > charlimit: raise WorkloadExceededError( "The TSE interpreter had its workload exceeded. The total characters " f"attempted were {total_work}/{charlimit}" ) return total_work @staticmethod def _text_deform(start: int, end: int, final: str, output: str) -> Tuple[str, int]: message_slice_len = (end + 1) - start replacement_len = len(output) differential = ( replacement_len - message_slice_len ) # The change in size of `final` after the change is applied final = final[:start] + output + final[end + 1 :] return final, differential @staticmethod def _translate_nodes( node_ordered_list: List[Node], index: int, start: int, differential: int ) -> None: for future_n in islice(node_ordered_list, index + 1, None): new_start = None new_end = None if future_n.coordinates[0] > start: new_start = future_n.coordinates[0] + differential else: new_start = future_n.coordinates[0] if future_n.coordinates[1] > start: new_end = future_n.coordinates[1] + differential else: new_end = future_n.coordinates[1] future_n.coordinates = (new_start, new_end) def _solve( self, message: str, node_ordered_list: List[Node], response: Response, *, charlimit: int, verb_limit: int = 6000, dot_parameter: bool, ) -> str: final = message total_work = 0 for index, node in enumerate(node_ordered_list): start, end = node.coordinates ctx = self._get_context( node, final, response=response, original_message=message, verb_limit=verb_limit, dot_parameter=dot_parameter, ) log.debug("Processing context %r at (%r, %r)", ctx, start, end) try: output = self._process_blocks(ctx, node) except StopError as exc: log.debug("StopError raised on node %r", node, exc_info=exc) return final[:start] + exc.message if output is None: continue # If there was no value output, no need to text deform. total_work = self._check_workload(charlimit, cast(int, total_work), output) final, differential = self._text_deform(start, end, final, output) self._translate_nodes(node_ordered_list, index, start, differential) return final @staticmethod def _return_response(response: Response, output: str) -> Response: if response.body is None: response.body = output.strip() else: # Dont override an overridden response. response.body = response.body.strip() return response
[docs] def process( self, message: str, seed_variables: Optional[AdapterDict] = None, *, charlimit: Optional[int] = None, dot_parameter: bool = False, **kwargs: Any, ) -> Response: """ Processes a given TagScript string. Parameters ---------- message: str A TagScript string to be processed. seed_variables: Dict[str, Adapter] A dictionary containing strings to adapters to provide context variables for processing. charlimit: int The maximum characters to process. dot_parameter: bool Whether the parameter should be followed after a "." or use the default of parantheses. kwargs: Dict[str, Any] Additional keyword arguments that may be used by blocks during processing. Returns ------- Response A response object containing the processed body, actions and variables. Raises ------ TagScriptError A block intentionally raised an exception, most likely due to invalid user input. WorkloadExceededError Signifies the interpreter reached the character limit, if one was provided. ProcessError An unexpected error occurred while processing blocks. """ response = Response(variables=seed_variables, extra_kwargs=kwargs) node_ordered_list = build_node_tree(message) try: output = self._solve( message, node_ordered_list, response, charlimit=cast(int, charlimit), dot_parameter=dot_parameter, ) except TagScriptError: raise except Exception as error: raise ProcessError(error, response, self) from error return self._return_response(response, output)
[docs] class AsyncInterpreter(Interpreter): """ An asynchronous subclass of :class:`Interpreter` that allows blocks to implement asynchronous methods. Synchronous blocks are still supported. This subclass has no additional attributes from the :class:`Interpreter` class. See :class:`Interpreter` for full documentation. """ async def _get_acceptors(self, ctx: Context) -> List[Block]: # type: ignore return [b for b in self.blocks if await maybe_await(b.will_accept, ctx)] async def _process_blocks(self, ctx: Context, node: Node) -> Optional[str]: # type: ignore acceptors = await self._get_acceptors(ctx) for b in acceptors: value = await maybe_await(b.process, ctx) if value is not None: # Value found? We're done here. value = str(value) node.output = value return value async def _solve( # type: ignore self, message: str, node_ordered_list: List[Node], response: Response, *, charlimit: int, verb_limit: int = 6000, dot_parameter: bool, ) -> str: final = message total_work = 0 for index, node in enumerate(node_ordered_list): start, end = node.coordinates ctx = self._get_context( node, final, response=response, original_message=message, verb_limit=verb_limit, dot_parameter=dot_parameter, ) try: output = await self._process_blocks(ctx, node) except StopError as exc: return final[:start] + exc.message if output is None: continue # If there was no value output, no need to text deform. total_work = self._check_workload(charlimit, cast(int, total_work), output) final, differential = self._text_deform(start, end, final, output) self._translate_nodes(node_ordered_list, index, start, differential) return final
[docs] async def process( # type: ignore self, message: str, seed_variables: Optional[AdapterDict] = None, *, charlimit: Optional[int] = None, dot_parameter: bool = False, **kwargs: Any, ) -> Response: """ Asynchronously process a given TagScript string. This method has no additional attributes from the :class:`Interpreter` class. See :meth:`Interpreter.process` for full documentation. """ response = Response(variables=seed_variables, extra_kwargs=kwargs) node_ordered_list = build_node_tree(message) try: output = await self._solve( message, node_ordered_list, response, charlimit=cast(int, charlimit), dot_parameter=dot_parameter, ) except TagScriptError: raise except Exception as error: raise ProcessError(error, response, self) from error return self._return_response(response, output)