# -*- coding: utf-8 -*- # # Copyright © Spyder Project Contributors # Licensed under the terms of the MIT License # (see spyder/__init__.py for details) """Internal shell widget : PythonShellWidget + Interpreter""" # pylint: disable=C0103 # pylint: disable=R0903 # pylint: disable=R0911 # pylint: disable=R0201 #FIXME: Internal shell MT: for i in range(100000): print i -> bug # Standard library imports from time import time import os import threading # Third party imports from qtpy.QtCore import QEventLoop, QObject, Signal, Slot from qtpy.QtWidgets import QMessageBox from spyder_kernels.utils.dochelpers import (getargtxt, getdoc, getobjdir, getsource) # Local imports from spyder import get_versions from spyder.api.translations import get_translation from spyder.plugins.console.utils.interpreter import Interpreter from spyder.py3compat import (builtins, to_binary_string, to_text_string) from spyder.utils.icon_manager import ima from spyder.utils import programs from spyder.utils.misc import get_error_match, getcwd_or_home from spyder.utils.qthelpers import create_action from spyder.plugins.console.widgets.shell import PythonShellWidget from spyder.plugins.variableexplorer.widgets.objecteditor import oedit from spyder.config.base import get_conf_path, get_debug_level # Localization _ = get_translation('spyder') builtins.oedit = oedit def create_banner(message): """Create internal shell banner""" if message is None: versions = get_versions() return 'Python %s %dbits [%s]'\ % (versions['python'], versions['bitness'], versions['system']) else: return message class SysOutput(QObject): """Handle standard I/O queue""" data_avail = Signal() def __init__(self): QObject.__init__(self) self.queue = [] self.lock = threading.Lock() def write(self, val): self.lock.acquire() self.queue.append(val) self.lock.release() self.data_avail.emit() def empty_queue(self): self.lock.acquire() s = "".join(self.queue) self.queue = [] self.lock.release() return s # We need to add this method to fix spyder-ide/spyder#1789. def flush(self): pass # This is needed to fix spyder-ide/spyder#2984. @property def closed(self): return False class WidgetProxyData(object): pass class WidgetProxy(QObject): """Handle Shell widget refresh signal""" sig_new_prompt = Signal(str) sig_set_readonly = Signal(bool) sig_edit = Signal(str, bool) sig_wait_input = Signal(str) def __init__(self, input_condition): QObject.__init__(self) # External editor self._gotoline = None self._path = None self.input_data = None self.input_condition = input_condition def new_prompt(self, prompt): self.sig_new_prompt.emit(prompt) def set_readonly(self, state): self.sig_set_readonly.emit(state) def edit(self, filename, external_editor=False): self.sig_edit.emit(filename, external_editor) def data_available(self): """Return True if input data is available""" return self.input_data is not WidgetProxyData def wait_input(self, prompt=''): self.input_data = WidgetProxyData self.sig_wait_input.emit(prompt) def end_input(self, cmd): self.input_condition.acquire() self.input_data = cmd self.input_condition.notify() self.input_condition.release() class InternalShell(PythonShellWidget): """Shell base widget: link between PythonShellWidget and Interpreter""" # --- Signals # This signal is emitted when the buffer is flushed sig_refreshed = Signal() # Request to show a status message on the main window sig_show_status_requested = Signal(str) # This signal emits a parsed error traceback text so we can then # request opening the file that traceback comes from in the Editor. sig_go_to_error_requested = Signal(str) # TODO: I think this is not being used now? sig_focus_changed = Signal() def __init__(self, parent=None, namespace=None, commands=[], message=None, max_line_count=300, exitfunc=None, profile=False, multithreaded=True): super().__init__(parent, get_conf_path('history_internal.py'), profile=profile) self.multithreaded = multithreaded self.setMaximumBlockCount(max_line_count) # Allow raw_input support: self.input_loop = None self.input_mode = False # KeyboardInterrupt support self.interrupted = False # used only for not-multithreaded mode self.sig_keyboard_interrupt.connect(self.keyboard_interrupt) # Code completion / calltips # keyboard events management self.eventqueue = [] # Init interpreter self.exitfunc = exitfunc self.commands = commands self.message = message self.interpreter = None # Clear status bar self.sig_show_status_requested.emit('') # Embedded shell -- requires the monitor (which installs the # 'open_in_spyder' function in builtins) if hasattr(builtins, 'open_in_spyder'): self.sig_go_to_error_requested.connect( self.open_with_external_spyder) #------ Interpreter def start_interpreter(self, namespace): """Start Python interpreter.""" self.clear() if self.interpreter is not None: self.interpreter.closing() self.interpreter = Interpreter(namespace, self.exitfunc, SysOutput, WidgetProxy, get_debug_level()) self.interpreter.stdout_write.data_avail.connect(self.stdout_avail) self.interpreter.stderr_write.data_avail.connect(self.stderr_avail) self.interpreter.widget_proxy.sig_set_readonly.connect(self.setReadOnly) self.interpreter.widget_proxy.sig_new_prompt.connect(self.new_prompt) self.interpreter.widget_proxy.sig_edit.connect(self.edit_script) self.interpreter.widget_proxy.sig_wait_input.connect(self.wait_input) if self.multithreaded: self.interpreter.start() # Interpreter banner banner = create_banner(self.message) self.write(banner, prompt=True) # Initial commands for cmd in self.commands: self.run_command(cmd, history=False, new_prompt=False) # First prompt self.new_prompt(self.interpreter.p1) self.sig_refreshed.emit() return self.interpreter def exit_interpreter(self): """Exit interpreter""" self.interpreter.exit_flag = True if self.multithreaded: self.interpreter.stdin_write.write(to_binary_string('\n')) self.interpreter.restore_stds() def edit_script(self, filename, external_editor): filename = to_text_string(filename) if external_editor: self.external_editor(filename) else: self.parent().edit_script(filename) def stdout_avail(self): """Data is available in stdout, let's empty the queue and write it!""" data = self.interpreter.stdout_write.empty_queue() if data: self.write(data) def stderr_avail(self): """Data is available in stderr, let's empty the queue and write it!""" data = self.interpreter.stderr_write.empty_queue() if data: self.write(data, error=True) self.flush(error=True) #------Raw input support def wait_input(self, prompt=''): """Wait for input (raw_input support)""" self.new_prompt(prompt) self.setFocus() self.input_mode = True self.input_loop = QEventLoop() self.input_loop.exec_() self.input_loop = None def end_input(self, cmd): """End of wait_input mode""" self.input_mode = False self.input_loop.exit() self.interpreter.widget_proxy.end_input(cmd) #----- Menus, actions, ... def setup_context_menu(self): """Reimplement PythonShellWidget method""" PythonShellWidget.setup_context_menu(self) self.help_action = create_action(self, _("Help..."), icon=ima.icon('DialogHelpButton'), triggered=self.help) self.menu.addAction(self.help_action) @Slot() def help(self): """Help on Spyder console""" QMessageBox.about(self, _("Help"), """%s
%s
edit foobar.py
%s
xedit foobar.py
%s
run foobar.py
%s
clear x, y
%s
!ls
%s
object?
%s
result = oedit(object)
""" % (_('Shell special commands:'),
_('Internal editor:'),
_('External editor:'),
_('Run script:'),
_('Remove references:'),
_('System commands:'),
_('Python help:'),
_('GUI-based editor:')))
#------ External editing
def open_with_external_spyder(self, text):
"""Load file in external Spyder's editor, if available
This method is used only for embedded consoles
(could also be useful if we ever implement the magic %edit command)"""
match = get_error_match(to_text_string(text))
if match:
fname, lnb = match.groups()
builtins.open_in_spyder(fname, int(lnb))
def set_external_editor(self, path, gotoline):
"""Set external editor path and gotoline option."""
self._path = path
self._gotoline = gotoline
def external_editor(self, filename, goto=-1):
"""
Edit in an external editor.
Recommended: SciTE (e.g. to go to line where an error did occur).
"""
editor_path = self._path
goto_option = self._gotoline
if os.path.isfile(editor_path):
try:
args = [filename]
if goto > 0 and goto_option:
args.append('%s%d'.format(goto_option, goto))
programs.run_program(editor_path, args)
except OSError:
self.write_error("External editor was not found:"
" %s\n" % editor_path)
#------ I/O
def flush(self, error=False, prompt=False):
"""Reimplement ShellBaseWidget method"""
PythonShellWidget.flush(self, error=error, prompt=prompt)
if self.interrupted:
self.interrupted = False
raise KeyboardInterrupt
#------ Clear terminal
def clear_terminal(self):
"""Reimplement ShellBaseWidget method"""
self.clear()
self.new_prompt(self.interpreter.p2 if self.interpreter.more else self.interpreter.p1)
#------ Keyboard events
def on_enter(self, command):
"""on_enter"""
if self.profile:
# Simple profiling test
t0 = time()
for _ in range(10):
self.execute_command(command)
self.insert_text(u"\n<Δt>=%dms\n" % (1e2*(time()-t0)))
self.new_prompt(self.interpreter.p1)
else:
self.execute_command(command)
self.__flush_eventqueue()
def keyPressEvent(self, event):
"""
Reimplement Qt Method
Enhanced keypress event handler
"""
if self.preprocess_keyevent(event):
# Event was accepted in self.preprocess_keyevent
return
self.postprocess_keyevent(event)
def __flush_eventqueue(self):
"""Flush keyboard event queue"""
while self.eventqueue:
past_event = self.eventqueue.pop(0)
self.postprocess_keyevent(past_event)
#------ Command execution
def keyboard_interrupt(self):
"""Simulate keyboard interrupt"""
if self.multithreaded:
self.interpreter.raise_keyboard_interrupt()
else:
if self.interpreter.more:
self.write_error("\nKeyboardInterrupt\n")
self.interpreter.more = False
self.new_prompt(self.interpreter.p1)
self.interpreter.resetbuffer()
else:
self.interrupted = True
def execute_lines(self, lines):
"""
Execute a set of lines as multiple command
lines: multiple lines of text to be executed as single commands
"""
for line in lines.splitlines():
stripped_line = line.strip()
if stripped_line.startswith('#'):
continue
self.write(line+os.linesep, flush=True)
self.execute_command(line+"\n")
self.flush()
def execute_command(self, cmd):
"""
Execute a command
cmd: one-line command only, with '\n' at the end
"""
if self.input_mode:
self.end_input(cmd)
return
if cmd.endswith('\n'):
cmd = cmd[:-1]
# cls command
if cmd == 'cls':
self.clear_terminal()
return
self.run_command(cmd)
def run_command(self, cmd, history=True, new_prompt=True):
"""Run command in interpreter"""
if not cmd:
cmd = ''
else:
if history:
self.add_to_history(cmd)
if not self.multithreaded:
if 'input' not in cmd:
self.interpreter.stdin_write.write(
to_binary_string(cmd + '\n'))
self.interpreter.run_line()
self.sig_refreshed.emit()
else:
self.write(_('In order to use commands like "raw_input" '
'or "input" run Spyder with the multithread '
'option (--multithread) from a system terminal'),
error=True)
else:
self.interpreter.stdin_write.write(to_binary_string(cmd + '\n'))
#------ Code completion / Calltips
def _eval(self, text):
"""Is text a valid object?"""
return self.interpreter.eval(text)
def get_dir(self, objtxt):
"""Return dir(object)"""
obj, valid = self._eval(objtxt)
if valid:
return getobjdir(obj)
def get_globals_keys(self):
"""Return shell globals() keys"""
return list(self.interpreter.namespace.keys())
def get_cdlistdir(self):
"""Return shell current directory list dir"""
return os.listdir(getcwd_or_home())
def iscallable(self, objtxt):
"""Is object callable?"""
obj, valid = self._eval(objtxt)
if valid:
return callable(obj)
def get_arglist(self, objtxt):
"""Get func/method argument list"""
obj, valid = self._eval(objtxt)
if valid:
return getargtxt(obj)
def get__doc__(self, objtxt):
"""Get object __doc__"""
obj, valid = self._eval(objtxt)
if valid:
return obj.__doc__
def get_doc(self, objtxt):
"""Get object documentation dictionary"""
obj, valid = self._eval(objtxt)
if valid:
return getdoc(obj)
def get_source(self, objtxt):
"""Get object source"""
obj, valid = self._eval(objtxt)
if valid:
return getsource(obj)
def is_defined(self, objtxt, force_import=False):
"""Return True if object is defined"""
return self.interpreter.is_defined(objtxt, force_import)