import ast
import html
import os
import sys
from collections import defaultdict, Counter
from textwrap import dedent
from types import FrameType, CodeType, TracebackType
from typing import (
Iterator, List, Tuple, Optional, NamedTuple,
Any, Iterable, Callable, Union,
Sequence)
from typing import Mapping
import executing
from asttokens.util import Token
from executing import only
from pure_eval import Evaluator, is_expression_interesting
from stack_data.utils import (
truncate, unique_in_order, line_range,
frame_and_lineno, iter_stack, collapse_repeated, group_by_key_func,
cached_property, is_frame, _pygmented_with_ranges, assert_)
RangeInLine = NamedTuple('RangeInLine',
[('start', int),
('end', int),
('data', Any)])
RangeInLine.__doc__ = """
Represents a range of characters within one line of source code,
and some associated data.
Typically this will be converted to a pair of markers by markers_from_ranges.
"""
MarkerInLine = NamedTuple('MarkerInLine',
[('position', int),
('is_start', bool),
('string', str)])
MarkerInLine.__doc__ = """
A string that is meant to be inserted at a given position in a line of source code.
For example, this could be an ANSI code or the opening or closing of an HTML tag.
is_start should be True if this is the first of a pair such as the opening of an HTML tag.
This will help to sort and insert markers correctly.
Typically this would be created from a RangeInLine by markers_from_ranges.
Then use Line.render to insert the markers correctly.
"""
class Variable(
NamedTuple('_Variable',
[('name', str),
('nodes', Sequence[ast.AST]),
('value', Any)])
):
"""
An expression that appears one or more times in source code and its associated value.
This will usually be a variable but it can be any expression evaluated by pure_eval.
- name is the source text of the expression.
- nodes is a list of equivalent nodes representing the same expression.
- value is the safely evaluated value of the expression.
"""
__hash__ = object.__hash__
__eq__ = object.__eq__
class Source(executing.Source):
"""
The source code of a single file and associated metadata.
In addition to the attributes from the base class executing.Source,
if .tree is not None, meaning this is valid Python code, objects have:
- pieces: a list of Piece objects
- tokens_by_lineno: a defaultdict(list) mapping line numbers to lists of tokens.
Don't construct this class. Get an instance from frame_info.source.
"""
def __init__(self, *args, **kwargs):
super(Source, self).__init__(*args, **kwargs)
if self.tree:
self.asttokens()
@cached_property
def pieces(self) -> List[range]:
if not self.tree:
return [
range(i, i + 1)
for i in range(1, len(self.lines) + 1)
]
return list(self._clean_pieces())
@cached_property
def tokens_by_lineno(self) -> Mapping[int, List[Token]]:
if not self.tree:
raise AttributeError("This file doesn't contain valid Python, so .tokens_by_lineno doesn't exist")
return group_by_key_func(
self.asttokens().tokens,
lambda tok: tok.start[0],
)
def _clean_pieces(self) -> Iterator[range]:
pieces = self._raw_split_into_pieces(self.tree, 1, len(self.lines) + 1)
pieces = [
(start, end)
for (start, end) in pieces
if end > start
]
starts = [start for start, end in pieces[1:]]
ends = [end for start, end in pieces[:-1]]
if starts != ends:
joins = list(map(set, zip(starts, ends)))
mismatches = [s for s in joins if len(s) > 1]
raise AssertionError("Pieces mismatches: %s" % mismatches)
def is_blank(i):
try:
return not self.lines[i - 1].strip()
except IndexError:
return False
for start, end in pieces:
while is_blank(start):
start += 1
while is_blank(end - 1):
end -= 1
if start < end:
yield range(start, end)
def _raw_split_into_pieces(
self,
stmt: ast.AST,
start: int,
end: int,
) -> Iterator[Tuple[int, int]]:
self.asttokens()
for name, body in ast.iter_fields(stmt):
if (
isinstance(body, list) and body and
isinstance(body[0], (ast.stmt, ast.ExceptHandler))
):
for rang, group in sorted(group_by_key_func(body, line_range).items()):
sub_stmt = group[0]
for inner_start, inner_end in self._raw_split_into_pieces(sub_stmt, *rang):
if start < inner_start:
yield start, inner_start
if inner_start < inner_end:
yield inner_start, inner_end
start = inner_end
yield start, end
class Options:
"""
Configuration for FrameInfo, either in the constructor or the .stack_data classmethod.
These all determine which Lines and gaps are produced by FrameInfo.lines.
before and after are the number of pieces of context to include in a frame
in addition to the executing piece.
include_signature is whether to include the function signature as a piece in a frame.
If a piece (other than the executing piece) has more than max_lines_per_piece lines,
it will be truncated with a gap in the middle.
"""
def __init__(
self, *,
before: int = 3,
after: int = 1,
include_signature: bool = False,
max_lines_per_piece: int = 6,
pygments_formatter=None
):
self.before = before
self.after = after
self.include_signature = include_signature
self.max_lines_per_piece = max_lines_per_piece
self.pygments_formatter = pygments_formatter
def __repr__(self):
keys = sorted(self.__dict__)
items = ("{}={!r}".format(k, self.__dict__[k]) for k in keys)
return "{}({})".format(type(self).__name__, ", ".join(items))
class LineGap(object):
"""
A singleton representing one or more lines of source code that were skipped
in FrameInfo.lines.
LINE_GAP can be created in two ways:
- by truncating a piece of context that's too long.
- immediately after the signature piece if Options.include_signature is true
and the following piece isn't already part of the included pieces.
"""
def __repr__(self):
return "LINE_GAP"
LINE_GAP = LineGap()
class Line(object):
"""
A single line of source code for a particular stack frame.
Typically this is obtained from FrameInfo.lines.
Since that list may also contain LINE_GAP, you should first check
that this is really a Line before using it.
Attributes:
- frame_info
- lineno: the 1-based line number within the file
- text: the raw source of this line. For displaying text, see .render() instead.
- leading_indent: the number of leading spaces that should probably be stripped.
This attribute is set within FrameInfo.lines. If you construct this class
directly you should probably set it manually (at least to 0).
- is_current: whether this is the line currently being executed by the interpreter
within this frame.
- tokens: a list of source tokens in this line
There are several helpers for constructing RangeInLines which can be converted to markers
using markers_from_ranges which can be passed to .render():
- token_ranges
- variable_ranges
- executing_node_ranges
- range_from_node
"""
def __init__(
self,
frame_info: 'FrameInfo',
lineno: int,
):
self.frame_info = frame_info
self.lineno = lineno
self.text = frame_info.source.lines[lineno - 1] # type: str
self.leading_indent = None # type: Optional[int]
def __repr__(self):
return "<{self.__class__.__name__} {self.lineno} (current={self.is_current}) " \
"{self.text!r} of {self.frame_info.filename}>".format(self=self)
@property
def is_current(self) -> bool:
"""
Whether this is the line currently being executed by the interpreter
within this frame.
"""
return self.lineno == self.frame_info.lineno
@property
def tokens(self) -> List[Token]:
"""
A list of source tokens in this line.
The tokens are Token objects from asttokens:
https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token
"""
return self.frame_info.source.tokens_by_lineno[self.lineno]
@cached_property
def token_ranges(self) -> List[RangeInLine]:
"""
A list of RangeInLines for each token in .tokens,
where range.data is a Token object from asttokens:
https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token
"""
return [
RangeInLine(
token.start[1],
token.end[1],
token,
)
for token in self.tokens
]
@cached_property
def variable_ranges(self) -> List[RangeInLine]:
"""
A list of RangeInLines for each Variable that appears at least partially in this line.
The data attribute of the range is a pair (variable, node) where node is the particular
AST node from the list variable.nodes that corresponds to this range.
"""
return [
self.range_from_node(node, (variable, node))
for variable, node in self.frame_info.variables_by_lineno[self.lineno]
]
@cached_property
def executing_node_ranges(self) -> List[RangeInLine]:
"""
A list of one or zero RangeInLines for the executing node of this frame.
The list will have one element if the node can be found and it overlaps this line.
"""
return self._raw_executing_node_ranges(
self.frame_info._executing_node_common_indent
)
def _raw_executing_node_ranges(self, common_indent=0) -> List[RangeInLine]:
ex = self.frame_info.executing
node = ex.node
if node:
rang = self.range_from_node(node, ex, common_indent)
if rang:
return [rang]
return []
def range_from_node(
self, node: ast.AST, data: Any, common_indent: int = 0
) -> Optional[RangeInLine]:
"""
If the given node overlaps with this line, return a RangeInLine
with the correct start and end and the given data.
Otherwise, return None.
"""
start, end = line_range(node)
end -= 1
if not (start <= self.lineno <= end):
return None
if start == self.lineno:
try:
range_start = node.first_token.start[1]
except AttributeError:
range_start = node.col_offset
else:
range_start = 0
range_start = max(range_start, common_indent)
if end == self.lineno:
try:
range_end = node.last_token.end[1]
except AttributeError:
try:
range_end = node.end_col_offset
except AttributeError:
return None
else:
range_end = len(self.text)
return RangeInLine(range_start, range_end, data)
def render(
self,
markers: Iterable[MarkerInLine] = (),
*,
strip_leading_indent: bool = True,
pygmented: bool = False,
escape_html: bool = False
) -> str:
"""
Produces a string for display consisting of .text
with the .strings of each marker inserted at the correct positions.
If strip_leading_indent is true (the default) then leading spaces
common to all lines in this frame will be excluded.
"""
if pygmented and self.frame_info.scope:
assert_(not markers, ValueError("Cannot use pygmented with markers"))
start_line, lines = self.frame_info._pygmented_scope_lines
result = lines[self.lineno - start_line]
if strip_leading_indent:
result = result.replace(self.text[:self.leading_indent], "", 1)
return result
text = self.text
# This just makes the loop below simpler
markers = list(markers) + [MarkerInLine(position=len(text), is_start=False, string='')]
markers.sort(key=lambda t: t[:2])
parts = []
if strip_leading_indent:
start = self.leading_indent
else:
start = 0
original_start = start
for marker in markers:
text_part = text[start:marker.position]
if escape_html:
text_part = html.escape(text_part)
parts.append(text_part)
parts.append(marker.string)
# Ensure that start >= leading_indent
start = max(marker.position, original_start)
return ''.join(parts)
def markers_from_ranges(
ranges: Iterable[RangeInLine],
converter: Callable[[RangeInLine], Optional[Tuple[str, str]]],
) -> List[MarkerInLine]:
"""
Helper to create MarkerInLines given some RangeInLines.
converter should be a function accepting a RangeInLine returning
either None (which is ignored) or a pair of strings which
are used to create two markers included in the returned list.
"""
markers = []
for rang in ranges:
converted = converter(rang)
if converted is None:
continue
start_string, end_string = converted
if not (isinstance(start_string, str) and isinstance(end_string, str)):
raise TypeError("converter should return None or a pair of strings")
markers += [
MarkerInLine(position=rang.start, is_start=True, string=start_string),
MarkerInLine(position=rang.end, is_start=False, string=end_string),
]
return markers
def style_with_executing_node(style, modifier):
from pygments.styles import get_style_by_name
if isinstance(style, str):
style = get_style_by_name(style)
class NewStyle(style):
for_executing_node = True
styles = {
**style.styles,
**{
k.ExecutingNode: v + " " + modifier
for k, v in style.styles.items()
}
}
return NewStyle
class RepeatedFrames:
"""
A sequence of consecutive stack frames which shouldn't be displayed because
the same code and line number were repeated many times in the stack, e.g.
because of deep recursion.
Attributes:
- frames: list of raw frame or traceback objects
- frame_keys: list of tuples (frame.f_code, lineno) extracted from the frame objects.
It's this information from the frames that is used to determine
whether two frames should be considered similar (i.e. repeating).
- description: A string briefly describing frame_keys
"""
def __init__(
self,
frames: List[Union[FrameType, TracebackType]],
frame_keys: List[Tuple[CodeType, int]],
):
self.frames = frames
self.frame_keys = frame_keys
@cached_property
def description(self) -> str:
"""
A string briefly describing the repeated frames, e.g.
my_function at line 10 (100 times)
"""
counts = sorted(Counter(self.frame_keys).items(),
key=lambda item: (-item[1], item[0][0].co_name))
return ', '.join(
'{name} at line {lineno} ({count} times)'.format(
name=Source.for_filename(code.co_filename).code_qualname(code),
lineno=lineno,
count=count,
)
for (code, lineno), count in counts
)
def __repr__(self):
return '<{self.__class__.__name__} {self.description}>'.format(self=self)
class FrameInfo(object):
"""
Information about a frame!
Pass either a frame object or a traceback object,
and optionally an Options object to configure.
Or use the classmethod FrameInfo.stack_data() for an iterator of FrameInfo and
RepeatedFrames objects.
Attributes:
- frame: an actual stack frame object, either frame_or_tb or frame_or_tb.tb_frame
- options
- code: frame.f_code
- source: a Source object
- filename: a hopefully absolute file path derived from code.co_filename
- scope: the AST node of the innermost function, class or module being executed
- lines: a list of Line/LineGap objects to display, determined by options
- executing: an Executing object from the `executing` library, which has:
- .node: the AST node being executed in this frame, or None if it's unknown
- .statements: a set of one or more candidate statements (AST nodes, probably just one)
currently being executed in this frame.
- .code_qualname(): the __qualname__ of the function or class being executed,
or just the code name.
Properties returning one or more pieces of source code (ranges of lines):
- scope_pieces: all the pieces in the scope
- included_pieces: a subset of scope_pieces determined by options
- executing_piece: the piece currently being executed in this frame
Properties returning lists of Variable objects:
- variables: all variables in the scope
- variables_by_lineno: variables organised into lines
- variables_in_lines: variables contained within FrameInfo.lines
- variables_in_executing_piece: variables contained within FrameInfo.executing_piece
"""
def __init__(
self,
frame_or_tb: Union[FrameType, TracebackType],
options: Optional[Options] = None,
):
self.executing = Source.executing(frame_or_tb)
frame, self.lineno = frame_and_lineno(frame_or_tb)
self.frame = frame
self.code = frame.f_code
self.options = options or Options() # type: Options
self.source = self.executing.source # type: Source
def __repr__(self):
return "{self.__class__.__name__}({self.frame})".format(self=self)
@classmethod
def stack_data(
cls,
frame_or_tb: Union[FrameType, TracebackType],
options: Optional[Options] = None,
*,
collapse_repeated_frames: bool = True
) -> Iterator[Union['FrameInfo', RepeatedFrames]]:
"""
An iterator of FrameInfo and RepeatedFrames objects representing
a full traceback or stack. Similar consecutive frames are collapsed into RepeatedFrames
objects, so always check what type of object has been yielded.
Pass either a frame object or a traceback object,
and optionally an Options object to configure.
"""
stack = list(iter_stack(frame_or_tb))
# Reverse the stack from a frame so that it's in the same order
# as the order from a traceback, which is the order of a printed
# traceback when read top to bottom (most recent call last)
if is_frame(frame_or_tb):
stack = stack[::-1]
def mapper(f):
return cls(f, options)
if not collapse_repeated_frames:
yield from map(mapper, stack)
return
def _frame_key(x):
frame, lineno = frame_and_lineno(x)
return frame.f_code, lineno
yield from collapse_repeated(
stack,
mapper=mapper,
collapser=RepeatedFrames,
key=_frame_key,
)
@cached_property
def scope_pieces(self) -> List[range]:
"""
All the pieces (ranges of lines) contained in this object's .scope,
unless there is no .scope (because the source isn't valid Python syntax)
in which case it returns all the pieces in the source file, each containing one line.
"""
if not self.scope:
return self.source.pieces
scope_start, scope_end = line_range(self.scope)
return [
piece
for piece in self.source.pieces
if scope_start <= piece.start and piece.stop <= scope_end
]
@cached_property
def filename(self) -> str:
"""
A hopefully absolute file path derived from .code.co_filename,
the current working directory, and sys.path.
Code based on ipython.
"""
result = self.code.co_filename
if (
os.path.isabs(result) or
(
result.startswith("<") and
result.endswith(">")
)
):
return result
# Try to make the filename absolute by trying all
# sys.path entries (which is also what linecache does)
# as well as the current working directory
for dirname in ["."] + list(sys.path):
try:
fullname = os.path.join(dirname, result)
if os.path.isfile(fullname):
return os.path.abspath(fullname)
except Exception:
# Just in case that sys.path contains very
# strange entries...
pass
return result
@cached_property
def executing_piece(self) -> range:
"""
The piece (range of lines) containing the line currently being executed
by the interpreter in this frame.
"""
return only(
piece
for piece in self.scope_pieces
if self.lineno in piece
)
@cached_property
def included_pieces(self) -> List[range]:
"""
The list of pieces (ranges of lines) to display for this frame.
Consists of .executing_piece, surrounding context pieces
determined by .options.before and .options.after,
and the function signature if a function is being executed and
.options.include_signature is True (in which case this might not
be a contiguous range of pieces).
Always a subset of .scope_pieces.
"""
scope_pieces = self.scope_pieces
if not self.scope_pieces:
return []
pos = scope_pieces.index(self.executing_piece)
pieces_start = max(0, pos - self.options.before)
pieces_end = pos + 1 + self.options.after
pieces = scope_pieces[pieces_start:pieces_end]
if (
self.options.include_signature
and not self.code.co_name.startswith('<')
and isinstance(self.scope, (ast.FunctionDef, ast.AsyncFunctionDef))
and pieces_start > 0
):
pieces.insert(0, scope_pieces[0])
return pieces
@cached_property
def _executing_node_common_indent(self) -> int:
"""
The common minimal indentation shared by the markers intended
for an exception node that spans multiple lines.
Intended to be used only internally.
"""
indents = []
lines = [line for line in self.lines if isinstance(line, Line)]
for line in lines:
for rang in line._raw_executing_node_ranges():
begin_text = len(line.text) - len(line.text.lstrip())
indent = max(rang.start, begin_text)
indents.append(indent)
return min(indents) if indents else 0
@cached_property
def lines(self) -> List[Union[Line, LineGap]]:
"""
A list of lines to display, determined by options.
The objects yielded either have type Line or are the singleton LINE_GAP.
Always check the type that you're dealing with when iterating.
LINE_GAP can be created in two ways:
- by truncating a piece of context that's too long, determined by
.options.max_lines_per_piece
- immediately after the signature piece if Options.include_signature is true
and the following piece isn't already part of the included pieces.
The Line objects are all within the ranges from .included_pieces.
"""
pieces = self.included_pieces
if not pieces:
return []
result = []
for i, piece in enumerate(pieces):
if (
i == 1
and self.scope
and pieces[0] == self.scope_pieces[0]
and pieces[1] != self.scope_pieces[1]
):
result.append(LINE_GAP)
lines = [Line(self, i) for i in piece] # type: List[Line]
if piece != self.executing_piece:
lines = truncate(
lines,
max_length=self.options.max_lines_per_piece,
middle=[LINE_GAP],
)
result.extend(lines)
real_lines = [
line
for line in result
if isinstance(line, Line)
]
text = "\n".join(
line.text
for line in real_lines
)
dedented_lines = dedent(text).splitlines()
leading_indent = len(real_lines[0].text) - len(dedented_lines[0])
for line in real_lines:
line.leading_indent = leading_indent
return result
@cached_property
def scope(self) -> Optional[ast.AST]:
"""
The AST node of the innermost function, class or module being executed.
"""
if not self.source.tree or not self.executing.statements:
return None
stmt = list(self.executing.statements)[0]
while True:
# Get the parent first in case the original statement is already
# a function definition, e.g. if we're calling a decorator
# In that case we still want the surrounding scope, not that function
stmt = stmt.parent
if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
return stmt
@cached_property
def _pygmented_scope_lines(self) -> Optional[Tuple[int, List[str]]]:
# noinspection PyUnresolvedReferences
from pygments.formatters import HtmlFormatter
formatter = self.options.pygments_formatter
scope = self.scope
assert_(formatter, ValueError("Must set a pygments formatter in Options"))
assert_(scope)
if isinstance(formatter, HtmlFormatter):
formatter.nowrap = True
atok = self.source.asttokens()
node = self.executing.node
if node and getattr(formatter.style, "for_executing_node", False):
scope_start = atok.get_text_range(scope)[0]
start, end = atok.get_text_range(node)
start -= scope_start
end -= scope_start
ranges = [(start, end)]
else:
ranges = []
code = atok.get_text(scope)
lines = _pygmented_with_ranges(formatter, code, ranges)
start_line = line_range(scope)[0]
return start_line, lines
@cached_property
def variables(self) -> List[Variable]:
"""
All Variable objects whose nodes are contained within .scope
and whose values could be safely evaluated by pure_eval.
"""
if not self.scope:
return []
evaluator = Evaluator.from_frame(self.frame)
scope = self.scope
node_values = [
pair
for pair in evaluator.find_expressions(scope)
if is_expression_interesting(*pair)
] # type: List[Tuple[ast.AST, Any]]
if isinstance(scope, (ast.FunctionDef, ast.AsyncFunctionDef)):
for node in ast.walk(scope.args):
if not isinstance(node, ast.arg):
continue
name = node.arg
try:
value = evaluator.names[name]
except KeyError:
pass
else:
node_values.append((node, value))
# Group equivalent nodes together
def get_text(n):
if isinstance(n, ast.arg):
return n.arg
else:
return self.source.asttokens().get_text(n)
def normalise_node(n):
try:
# Add parens to avoid syntax errors for multiline expressions
return ast.parse('(' + get_text(n) + ')')
except Exception:
return n
grouped = group_by_key_func(
node_values,
lambda nv: ast.dump(normalise_node(nv[0])),
)
result = []
for group in grouped.values():
nodes, values = zip(*group)
value = values[0]
text = get_text(nodes[0])
if not text:
continue
result.append(Variable(text, nodes, value))
return result
@cached_property
def variables_by_lineno(self) -> Mapping[int, List[Tuple[Variable, ast.AST]]]:
"""
A mapping from 1-based line numbers to lists of pairs:
- A Variable object
- A specific AST node from the variable's .nodes list that's
in the line at that line number.
"""
result = defaultdict(list)
for var in self.variables:
for node in var.nodes:
for lineno in range(*line_range(node)):
result[lineno].append((var, node))
return result
@cached_property
def variables_in_lines(self) -> List[Variable]:
"""
A list of Variable objects contained within the lines returned by .lines.
"""
return unique_in_order(
var
for line in self.lines
if isinstance(line, Line)
for var, node in self.variables_by_lineno[line.lineno]
)
@cached_property
def variables_in_executing_piece(self) -> List[Variable]:
"""
A list of Variable objects contained within the lines
in the range returned by .executing_piece.
"""
return unique_in_order(
var
for lineno in self.executing_piece
for var, node in self.variables_by_lineno[lineno]
)