# *** encoding: utf-8 *** """ An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance. The `KeyProcessor` will according to the implemented keybindings call the correct callbacks when new key presses are feed through `feed`. """ import weakref from asyncio import Task, sleep from collections import deque from typing import TYPE_CHECKING, Any, Deque, Generator, List, Optional, Union from prompt_toolkit.application.current import get_app from prompt_toolkit.enums import EditingMode from prompt_toolkit.filters.app import vi_navigation_mode from prompt_toolkit.keys import Keys from prompt_toolkit.utils import Event from .key_bindings import Binding, KeyBindingsBase if TYPE_CHECKING: from prompt_toolkit.application import Application from prompt_toolkit.buffer import Buffer __all__ = [ "KeyProcessor", "KeyPress", "KeyPressEvent", ] class KeyPress: """ :param key: A `Keys` instance or text (one character). :param data: The received string on stdin. (Often vt100 escape codes.) """ def __init__(self, key: Union[Keys, str], data: Optional[str] = None) -> None: assert isinstance(key, Keys) or len(key) == 1 if data is None: if isinstance(key, Keys): data = key.value else: data = key # 'key' is a one character string. self.key = key self.data = data def __repr__(self) -> str: return "%s(key=%r, data=%r)" % (self.__class__.__name__, self.key, self.data) def __eq__(self, other: object) -> bool: if not isinstance(other, KeyPress): return False return self.key == other.key and self.data == other.data """ Helper object to indicate flush operation in the KeyProcessor. NOTE: the implementation is very similar to the VT100 parser. """ _Flush = KeyPress("?", data="_Flush") class KeyProcessor: """ Statemachine that receives :class:`KeyPress` instances and according to the key bindings in the given :class:`KeyBindings`, calls the matching handlers. :: p = KeyProcessor(key_bindings) # Send keys into the processor. p.feed(KeyPress(Keys.ControlX, '\x18')) p.feed(KeyPress(Keys.ControlC, '\x03') # Process all the keys in the queue. p.process_keys() # Now the ControlX-ControlC callback will be called if this sequence is # registered in the key bindings. :param key_bindings: `KeyBindingsBase` instance. """ def __init__(self, key_bindings: KeyBindingsBase) -> None: self._bindings = key_bindings self.before_key_press = Event(self) self.after_key_press = Event(self) self._flush_wait_task: Optional[Task[None]] = None self.reset() def reset(self) -> None: self._previous_key_sequence: List[KeyPress] = [] self._previous_handler: Optional[Binding] = None # The queue of keys not yet send to our _process generator/state machine. self.input_queue: Deque[KeyPress] = deque() # The key buffer that is matched in the generator state machine. # (This is at at most the amount of keys that make up for one key binding.) self.key_buffer: List[KeyPress] = [] #: Readline argument (for repetition of commands.) #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html self.arg: Optional[str] = None # Start the processor coroutine. self._process_coroutine = self._process() self._process_coroutine.send(None) # type: ignore def _get_matches(self, key_presses: List[KeyPress]) -> List[Binding]: """ For a list of :class:`KeyPress` instances. Give the matching handlers that would handle this. """ keys = tuple(k.key for k in key_presses) # Try match, with mode flag return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()] def _is_prefix_of_longer_match(self, key_presses: List[KeyPress]) -> bool: """ For a list of :class:`KeyPress` instances. Return True if there is any handler that is bound to a suffix of this keys. """ keys = tuple(k.key for k in key_presses) # Get the filters for all the key bindings that have a longer match. # Note that we transform it into a `set`, because we don't care about # the actual bindings and executing it more than once doesn't make # sense. (Many key bindings share the same filter.) filters = set( b.filter for b in self._bindings.get_bindings_starting_with_keys(keys) ) # When any key binding is active, return True. return any(f() for f in filters) def _process(self) -> Generator[None, KeyPress, None]: """ Coroutine implementing the key match algorithm. Key strokes are sent into this generator, and it calls the appropriate handlers. """ buffer = self.key_buffer retry = False while True: flush = False if retry: retry = False else: key = yield if key is _Flush: flush = True else: buffer.append(key) # If we have some key presses, check for matches. if buffer: matches = self._get_matches(buffer) if flush: is_prefix_of_longer_match = False else: is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer) # When eager matches were found, give priority to them and also # ignore all the longer matches. eager_matches = [m for m in matches if m.eager()] if eager_matches: matches = eager_matches is_prefix_of_longer_match = False # Exact matches found, call handler. if not is_prefix_of_longer_match and matches: self._call_handler(matches[-1], key_sequence=buffer[:]) del buffer[:] # Keep reference. # No match found. elif not is_prefix_of_longer_match and not matches: retry = True found = False # Loop over the input, try longest match first and shift. for i in range(len(buffer), 0, -1): matches = self._get_matches(buffer[:i]) if matches: self._call_handler(matches[-1], key_sequence=buffer[:i]) del buffer[:i] found = True break if not found: del buffer[:1] def feed(self, key_press: KeyPress, first: bool = False) -> None: """ Add a new :class:`KeyPress` to the input queue. (Don't forget to call `process_keys` in order to process the queue.) :param first: If true, insert before everything else. """ if first: self.input_queue.appendleft(key_press) else: self.input_queue.append(key_press) def feed_multiple(self, key_presses: List[KeyPress], first: bool = False) -> None: """ :param first: If true, insert before everything else. """ if first: self.input_queue.extendleft(reversed(key_presses)) else: self.input_queue.extend(key_presses) def process_keys(self) -> None: """ Process all the keys in the `input_queue`. (To be called after `feed`.) Note: because of the `feed`/`process_keys` separation, it is possible to call `feed` from inside a key binding. This function keeps looping until the queue is empty. """ app = get_app() def not_empty() -> bool: # When the application result is set, stop processing keys. (E.g. # if ENTER was received, followed by a few additional key strokes, # leave the other keys in the queue.) if app.is_done: # But if there are still CPRResponse keys in the queue, these # need to be processed. return any(k for k in self.input_queue if k.key == Keys.CPRResponse) else: return bool(self.input_queue) def get_next() -> KeyPress: if app.is_done: # Only process CPR responses. Everything else is typeahead. cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0] self.input_queue.remove(cpr) return cpr else: return self.input_queue.popleft() keys_processed = False is_flush = False while not_empty(): keys_processed = True # Process next key. key_press = get_next() is_flush = key_press is _Flush is_cpr = key_press.key == Keys.CPRResponse if not is_flush and not is_cpr: self.before_key_press.fire() try: self._process_coroutine.send(key_press) except Exception: # If for some reason something goes wrong in the parser, (maybe # an exception was raised) restart the processor for next time. self.reset() self.empty_queue() app.invalidate() raise if not is_flush and not is_cpr: self.after_key_press.fire() if keys_processed: # Invalidate user interface. app.invalidate() # Skip timeout if the last key was flush. if not is_flush: self._start_timeout() def empty_queue(self) -> List[KeyPress]: """ Empty the input queue. Return the unprocessed input. """ key_presses = list(self.input_queue) self.input_queue.clear() # Filter out CPRs. We don't want to return these. key_presses = [k for k in key_presses if k.key != Keys.CPRResponse] return key_presses def _call_handler(self, handler: Binding, key_sequence: List[KeyPress]) -> None: app = get_app() was_recording_emacs = app.emacs_state.is_recording was_recording_vi = bool(app.vi_state.recording_register) was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode arg = self.arg self.arg = None event = KeyPressEvent( weakref.ref(self), arg=arg, key_sequence=key_sequence, previous_key_sequence=self._previous_key_sequence, is_repeat=(handler == self._previous_handler), ) # Save the state of the current buffer. if handler.save_before(event): event.app.current_buffer.save_to_undo_stack() # Call handler. from prompt_toolkit.buffer import EditReadOnlyBuffer try: handler.call(event) self._fix_vi_cursor_position(event) except EditReadOnlyBuffer: # When a key binding does an attempt to change a buffer which is # read-only, we can ignore that. We sound a bell and go on. app.output.bell() if was_temporary_navigation_mode: self._leave_vi_temp_navigation_mode(event) self._previous_key_sequence = key_sequence self._previous_handler = handler # Record the key sequence in our macro. (Only if we're in macro mode # before and after executing the key.) if handler.record_in_macro(): if app.emacs_state.is_recording and was_recording_emacs: recording = app.emacs_state.current_recording if recording is not None: # Should always be true, given that # `was_recording_emacs` is set. recording.extend(key_sequence) if app.vi_state.recording_register and was_recording_vi: for k in key_sequence: app.vi_state.current_recording += k.data def _fix_vi_cursor_position(self, event: "KeyPressEvent") -> None: """ After every command, make sure that if we are in Vi navigation mode, we never put the cursor after the last character of a line. (Unless it's an empty line.) """ app = event.app buff = app.current_buffer preferred_column = buff.preferred_column if ( vi_navigation_mode() and buff.document.is_cursor_at_the_end_of_line and len(buff.document.current_line) > 0 ): buff.cursor_position -= 1 # Set the preferred_column for arrow up/down again. # (This was cleared after changing the cursor position.) buff.preferred_column = preferred_column def _leave_vi_temp_navigation_mode(self, event: "KeyPressEvent") -> None: """ If we're in Vi temporary navigation (normal) mode, return to insert/replace mode after executing one action. """ app = event.app if app.editing_mode == EditingMode.VI: # Not waiting for a text object and no argument has been given. if app.vi_state.operator_func is None and self.arg is None: app.vi_state.temporary_navigation_mode = False def _start_timeout(self) -> None: """ Start auto flush timeout. Similar to Vim's `timeoutlen` option. Start a background coroutine with a timer. When this timeout expires and no key was pressed in the meantime, we flush all data in the queue and call the appropriate key binding handlers. """ app = get_app() timeout = app.timeoutlen if timeout is None: return async def wait() -> None: "Wait for timeout." # This sleep can be cancelled. In that case we don't flush. await sleep(timeout) if len(self.key_buffer) > 0: # (No keys pressed in the meantime.) flush_keys() def flush_keys() -> None: "Flush keys." self.feed(_Flush) self.process_keys() # Automatically flush keys. if self._flush_wait_task: self._flush_wait_task.cancel() self._flush_wait_task = app.create_background_task(wait()) class KeyPressEvent: """ Key press event, delivered to key bindings. :param key_processor_ref: Weak reference to the `KeyProcessor`. :param arg: Repetition argument. :param key_sequence: List of `KeyPress` instances. :param previouskey_sequence: Previous list of `KeyPress` instances. :param is_repeat: True when the previous event was delivered to the same handler. """ def __init__( self, key_processor_ref: "weakref.ReferenceType[KeyProcessor]", arg: Optional[str], key_sequence: List[KeyPress], previous_key_sequence: List[KeyPress], is_repeat: bool, ) -> None: self._key_processor_ref = key_processor_ref self.key_sequence = key_sequence self.previous_key_sequence = previous_key_sequence #: True when the previous key sequence was handled by the same handler. self.is_repeat = is_repeat self._arg = arg self._app = get_app() def __repr__(self) -> str: return "KeyPressEvent(arg=%r, key_sequence=%r, is_repeat=%r)" % ( self.arg, self.key_sequence, self.is_repeat, ) @property def data(self) -> str: return self.key_sequence[-1].data @property def key_processor(self) -> KeyProcessor: processor = self._key_processor_ref() if processor is None: raise Exception("KeyProcessor was lost. This should not happen.") return processor @property def app(self) -> "Application[Any]": """ The current `Application` object. """ return self._app @property def current_buffer(self) -> "Buffer": """ The current buffer. """ return self.app.current_buffer @property def arg(self) -> int: """ Repetition argument. """ if self._arg == "-": return -1 result = int(self._arg or 1) # Don't exceed a million. if int(result) >= 1000000: result = 1 return result @property def arg_present(self) -> bool: """ True if repetition argument was explicitly provided. """ return self._arg is not None def append_to_arg_count(self, data: str) -> None: """ Add digit to the input argument. :param data: the typed digit as string """ assert data in "-0123456789" current = self._arg if data == "-": assert current is None or current == "-" result = data elif current is None: result = data else: result = "%s%s" % (current, data) self.key_processor.arg = result @property def cli(self) -> "Application[Any]": "For backward-compatibility." return self.app