# -*- coding: utf-8 -*- # # Copyright © Spyder Project Contributors # Licensed under the terms of the MIT License # (see spyder/__init__.py for details) """ IPython Console plugin based on QtConsole. """ # pylint: disable=C0103 # pylint: disable=R0903 # pylint: disable=R0911 # pylint: disable=R0201 # Standard library imports import os import os.path as osp import sys import traceback import uuid # Third party imports from jupyter_client.connect import find_connection_file from jupyter_core.paths import jupyter_config_dir, jupyter_runtime_dir from qtconsole.client import QtKernelClient from qtpy.QtCore import Qt, Signal, Slot from qtpy.QtGui import QColor from qtpy.QtWebEngineWidgets import WEBENGINE from qtpy.QtWidgets import (QActionGroup, QApplication, QHBoxLayout, QLabel, QMenu, QMessageBox, QVBoxLayout, QWidget) from traitlets.config.loader import Config, load_pyconfig_files from zmq.ssh import tunnel as zmqtunnel # Local imports from spyder.api.plugins import Plugins, SpyderPluginWidget from spyder.api.widgets.menus import SpyderMenu from spyder.config.base import (_, get_conf_path, get_home_dir, running_under_pytest) from spyder.config.gui import get_font from spyder.config.manager import CONF from spyder.plugins.ipythonconsole.confpage import IPythonConsoleConfigPage from spyder.plugins.ipythonconsole.utils.kernelspec import SpyderKernelSpec from spyder.plugins.ipythonconsole.utils.manager import SpyderKernelManager from spyder.plugins.ipythonconsole.utils.ssh import openssh_tunnel from spyder.plugins.ipythonconsole.utils.style import create_qss_style from spyder.plugins.ipythonconsole.widgets import ( ClientWidget, ConsoleRestartDialog, KernelConnectionDialog, PageControlWidget) from spyder.plugins.mainmenu.api import ( ApplicationMenus, ConsolesMenuSections, HelpMenuSections) from spyder.py3compat import is_string, to_text_string, PY2, PY38_OR_MORE from spyder.utils import encoding from spyder.utils.icon_manager import ima from spyder.utils import programs, sourcecode from spyder.utils.misc import get_error_match, remove_backslashes from spyder.utils.palette import QStylePalette from spyder.utils.programs import get_temp_dir from spyder.utils.qthelpers import MENU_SEPARATOR, add_actions, create_action from spyder.widgets.browser import FrameWebView from spyder.widgets.findreplace import FindReplace from spyder.widgets.tabs import Tabs MAIN_BG_COLOR = QStylePalette.COLOR_BACKGROUND_1 class IPythonConsole(SpyderPluginWidget): """ IPython Console plugin This is a widget with tabs where each one is a ClientWidget """ CONF_SECTION = 'ipython_console' CONFIGWIDGET_CLASS = IPythonConsoleConfigPage CONF_FILE = False DISABLE_ACTIONS_WHEN_HIDDEN = False # This is required for the new API NAME = 'ipython_console' REQUIRES = [Plugins.Console, Plugins.Preferences] OPTIONAL = [Plugins.Editor, Plugins.History] # Signals sig_focus_changed = Signal() sig_edit_goto_requested = Signal((str, int, str), (str, int, str, bool)) sig_pdb_state_changed = Signal(bool, dict) sig_shellwidget_created = Signal(object) """ This signal is emitted when a shellwidget is created. Parameters ---------- shellwidget: spyder.plugins.ipyconsole.widgets.shell.ShellWidget The shellwigdet. """ sig_shellwidget_deleted = Signal(object) """ This signal is emitted when a shellwidget is deleted/removed. Parameters ---------- shellwidget: spyder.plugins.ipyconsole.widgets.shell.ShellWidget The shellwigdet. """ sig_shellwidget_changed = Signal(object) """ This signal is emitted when the current shellwidget changes. Parameters ---------- shellwidget: spyder.plugins.ipyconsole.widgets.shell.ShellWidget The shellwigdet. """ sig_render_plain_text_requested = Signal(str) """ This signal is emitted to request a plain text help render. Parameters ---------- plain_text: str The plain text to render. """ sig_render_rich_text_requested = Signal(str, bool) """ This signal is emitted to request a rich text help render. Parameters ---------- rich_text: str The rich text. collapse: bool If the text contains collapsed sections, show them closed (True) or open (False). """ sig_help_requested = Signal(dict) """ This signal is emitted to request help on a given object `name`. Parameters ---------- help_data: dict Example `{'name': str, 'ignore_unknown': bool}`. """ sig_current_directory_changed = Signal(str) """ This signal is emitted when the current directory of the active shell widget has changed. Parameters ---------- working_directory: str The new working directory path. """ # Remove when this plugin is migrated sig_exception_occurred = Signal(dict) # Error messages permission_error_msg = _("The directory {} is not writable and it is " "required to create IPython consoles. Please " "make it writable.") def __init__(self, parent, testing=False, test_dir=None, test_no_stderr=False): """Ipython Console constructor.""" SpyderPluginWidget.__init__(self, parent) self.tabwidget = None self.menu_actions = None self.master_clients = 0 self.clients = [] self.filenames = [] self.mainwindow_close = False self.create_new_client_if_empty = True self.css_path = CONF.get('appearance', 'css_path') self.run_cell_filename = None self.interrupt_action = None self.add_actions_to_main_menus = True # Attrs for testing self.testing = testing self.test_dir = test_dir self.test_no_stderr = test_no_stderr # Create temp dir on testing to save kernel errors if self.test_dir is not None: if not osp.isdir(osp.join(test_dir)): os.makedirs(osp.join(test_dir)) layout = QVBoxLayout() layout.setSpacing(0) self.tabwidget = Tabs(self, menu=self._options_menu, actions=self.menu_actions, rename_tabs=True, split_char='/', split_index=0) if hasattr(self.tabwidget, 'setDocumentMode')\ and not sys.platform == 'darwin': # Don't set document mode to true on OSX because it generates # a crash when the console is detached from the main window # Fixes spyder-ide/spyder#561. self.tabwidget.setDocumentMode(True) self.tabwidget.currentChanged.connect(self.refresh_plugin) self.tabwidget.tabBar().tabMoved.connect(self.move_tab) self.tabwidget.tabBar().sig_name_changed.connect( self.rename_tabs_after_change) self.tabwidget.set_close_function(self.close_client) self.main.editor.sig_file_debug_message_requested.connect( self.print_debug_file_msg) if sys.platform == 'darwin': tab_container = QWidget() tab_container.setObjectName('tab-container') tab_layout = QHBoxLayout(tab_container) tab_layout.setContentsMargins(0, 0, 0, 0) tab_layout.addWidget(self.tabwidget) layout.addWidget(tab_container) else: layout.addWidget(self.tabwidget) # Info widget self.infowidget = FrameWebView(self) if WEBENGINE: self.infowidget.page().setBackgroundColor(QColor(MAIN_BG_COLOR)) else: self.infowidget.setStyleSheet( "background:{}".format(MAIN_BG_COLOR)) self.set_infowidget_font() layout.addWidget(self.infowidget) # Label to inform users how to get out of the pager self.pager_label = QLabel(_("Press Q to exit pager"), self) self.pager_label.setStyleSheet( f"background-color: {QStylePalette.COLOR_ACCENT_2};" f"color: {QStylePalette.COLOR_TEXT_1};" "margin: 0px 1px 4px 1px;" "padding: 5px;" "qproperty-alignment: AlignCenter;" ) self.pager_label.hide() layout.addWidget(self.pager_label) # Find/replace widget self.find_widget = FindReplace(self) self.find_widget.hide() self.register_widget_shortcuts(self.find_widget) layout.addWidget(self.find_widget) self.setLayout(layout) # Accepting drops self.setAcceptDrops(True) # Needed to start Spyder in Windows with Python 3.8 # See spyder-ide/spyder#11880 self._init_asyncio_patch() #------ SpyderPluginMixin API --------------------------------------------- def update_font(self): """Update font from Preferences""" font = self.get_font() for client in self.clients: client.set_font(font) def _apply_gui_plugin_settings(self, options, client): """Apply GUI related configurations to a client.""" # GUI options font_n = 'plugin_font' help_n = 'connect_to_oi' color_scheme_n = 'color_scheme_name' show_time_n = 'show_elapsed_time' reset_namespace_n = 'show_reset_namespace_warning' ask_before_restart_n = 'ask_before_restart' ask_before_closing_n = 'ask_before_closing' show_calltips_n = 'show_calltips' buffer_size_n = 'buffer_size' completion_type_n = 'completion_type' # Advanced GUI options in_prompt_n = 'in_prompt' out_prompt_n = 'out_prompt' # Client widgets control = client.get_control() sw = client.shellwidget if font_n in options: font_o = self.get_font() client.set_font(font_o) if help_n in options and control is not None: help_o = CONF.get('help', 'connect/ipython_console') control.set_help_enabled(help_o) if color_scheme_n in options: color_scheme_o = CONF.get('appearance', 'selected') client.set_color_scheme(color_scheme_o) if show_time_n in options: show_time_o = self.get_option(show_time_n) client.show_time_action.setChecked(show_time_o) client.set_elapsed_time_visible(show_time_o) if reset_namespace_n in options: reset_namespace_o = self.get_option(reset_namespace_n) client.reset_warning = reset_namespace_o if ask_before_restart_n in options: ask_before_restart_o = self.get_option(ask_before_restart_n) client.ask_before_restart = ask_before_restart_o if ask_before_closing_n in options: ask_before_closing_o = self.get_option(ask_before_closing_n) client.ask_before_closing = ask_before_closing_o if show_calltips_n in options: show_calltips_o = self.get_option(show_calltips_n) sw.set_show_calltips(show_calltips_o) if buffer_size_n in options: buffer_size_o = self.get_option(buffer_size_n) sw.set_buffer_size(buffer_size_o) if completion_type_n in options: completion_type_o = self.get_option(completion_type_n) completions = {0: "droplist", 1: "ncurses", 2: "plain"} sw._set_completion_widget(completions[completion_type_o]) # Advanced GUI options if in_prompt_n in options: in_prompt_o = self.get_option(in_prompt_n) sw.set_in_prompt(in_prompt_o) if out_prompt_n in options: out_prompt_o = self.get_option(out_prompt_n) sw.set_out_prompt(out_prompt_o) def _apply_mpl_plugin_settings(self, options, client): """Apply Matplotlib related configurations to a client.""" # Matplotlib options pylab_n = 'pylab' pylab_o = self.get_option(pylab_n) pylab_autoload_n = 'pylab/autoload' pylab_backend_n = 'pylab/backend' inline_backend_figure_format_n = 'pylab/inline/figure_format' inline_backend_resolution_n = 'pylab/inline/resolution' inline_backend_width_n = 'pylab/inline/width' inline_backend_height_n = 'pylab/inline/height' inline_backend_bbox_inches_n = 'pylab/inline/bbox_inches' # Client widgets sw = client.shellwidget if pylab_o: if pylab_backend_n in options or pylab_autoload_n in options: pylab_autoload_o = self.get_option(pylab_autoload_n) pylab_backend_o = self.get_option(pylab_backend_n) sw.set_matplotlib_backend(pylab_backend_o, pylab_autoload_o) if inline_backend_figure_format_n in options: inline_backend_figure_format_o = self.get_option( inline_backend_figure_format_n) sw.set_mpl_inline_figure_format(inline_backend_figure_format_o) if inline_backend_resolution_n in options: inline_backend_resolution_o = self.get_option( inline_backend_resolution_n) sw.set_mpl_inline_resolution(inline_backend_resolution_o) if (inline_backend_width_n in options or inline_backend_height_n in options): inline_backend_width_o = self.get_option( inline_backend_width_n) inline_backend_height_o = self.get_option( inline_backend_height_n) sw.set_mpl_inline_figure_size( inline_backend_width_o, inline_backend_height_o) if inline_backend_bbox_inches_n in options: inline_backend_bbox_inches_o = self.get_option( inline_backend_bbox_inches_n) sw.set_mpl_inline_bbox_inches(inline_backend_bbox_inches_o) def _apply_advanced_plugin_settings(self, options, client): """Apply advanced configurations to a client.""" # Advanced options greedy_completer_n = 'greedy_completer' jedi_completer_n = 'jedi_completer' autocall_n = 'autocall' # Client widget sw = client.shellwidget if greedy_completer_n in options: greedy_completer_o = self.get_option(greedy_completer_n) sw.set_greedy_completer(greedy_completer_o) if jedi_completer_n in options: jedi_completer_o = self.get_option(jedi_completer_n) sw.set_jedi_completer(jedi_completer_o) if autocall_n in options: autocall_o = self.get_option(autocall_n) sw.set_autocall(autocall_o) def _apply_pdb_plugin_settings(self, options, client): """Apply debugging configurations to a client.""" # Debugging options pdb_ignore_lib_n = 'pdb_ignore_lib' pdb_execute_events_n = 'pdb_execute_events' pdb_use_exclamation_mark_n = 'pdb_use_exclamation_mark' # Client widget sw = client.shellwidget if pdb_ignore_lib_n in options: pdb_ignore_lib_o = self.get_option(pdb_ignore_lib_n) sw.set_pdb_ignore_lib(pdb_ignore_lib_o) if pdb_execute_events_n in options: pdb_execute_events_o = self.get_option(pdb_execute_events_n) sw.set_pdb_execute_events(pdb_execute_events_o) if pdb_use_exclamation_mark_n in options: pdb_use_exclamation_mark_o = self.get_option( pdb_use_exclamation_mark_n) sw.set_pdb_use_exclamation_mark(pdb_use_exclamation_mark_o) def apply_plugin_settings_to_client( self, options, client, disconnect_ready_signal=False): """Apply given plugin settings to the given client.""" # GUI options self._apply_gui_plugin_settings(options, client) # Matplotlib options self._apply_mpl_plugin_settings(options, client) # Advanced options self._apply_advanced_plugin_settings(options, client) # Debugging options self._apply_pdb_plugin_settings(options, client) if disconnect_ready_signal: client.shellwidget.sig_pdb_prompt_ready.disconnect() def apply_plugin_settings(self, options): """Apply configuration file's plugin settings.""" restart_needed = False restart_options = [] # Startup options (needs a restart) run_lines_n = 'startup/run_lines' use_run_file_n = 'startup/use_run_file' run_file_n = 'startup/run_file' # Graphic options pylab_n = 'pylab' pylab_o = self.get_option(pylab_n) pylab_backend_n = 'pylab/backend' inline_backend = 0 pylab_restart = False client_backend_not_inline = [False] * len(self.clients) if pylab_o and pylab_backend_n in options: pylab_backend_o = self.get_option(pylab_backend_n) client_backend_not_inline = [ client.shellwidget.get_matplotlib_backend() != inline_backend for client in self.clients] current_client_backend_not_inline = ( self.get_current_client().shellwidget.get_matplotlib_backend() != inline_backend) pylab_restart = ( any(client_backend_not_inline) and pylab_backend_o != inline_backend) # Advanced options (needs a restart) symbolic_math_n = 'symbolic_math' hide_cmd_windows_n = 'hide_cmd_windows' restart_options += [run_lines_n, use_run_file_n, run_file_n, symbolic_math_n, hide_cmd_windows_n] restart_needed = any([restart_option in options for restart_option in restart_options]) if (restart_needed or pylab_restart) and not running_under_pytest(): restart_dialog = ConsoleRestartDialog(self) restart_dialog.exec_() (restart_all, restart_current, no_restart) = restart_dialog.get_action_value() else: restart_all = False restart_current = False no_restart = True # Apply settings for idx, client in enumerate(self.clients): restart = ((pylab_restart and client_backend_not_inline[idx]) or restart_needed) if not (restart and restart_all) or no_restart: sw = client.shellwidget if sw.is_debugging() and sw._executing: # Apply settings when the next Pdb prompt is available sw.sig_pdb_prompt_ready.connect( lambda o=options, c=client: self.apply_plugin_settings_to_client( o, c, disconnect_ready_signal=True) ) else: self.apply_plugin_settings_to_client(options, client) elif restart and restart_all: client.ask_before_restart = False client.restart_kernel() if (((pylab_restart and current_client_backend_not_inline) or restart_needed) and restart_current): current_client = self.get_current_client() current_client.ask_before_restart = False current_client.restart_kernel() def toggle_view(self, checked): """Toggle view""" if checked: self.dockwidget.show() self.dockwidget.raise_() # Start a client in case there are none shown if not self.clients: if self.main.is_setting_up: self.create_new_client(give_focus=False) else: self.create_new_client(give_focus=True) else: self.dockwidget.hide() #------ SpyderPluginWidget API -------------------------------------------- def get_plugin_title(self): """Return widget title""" return _('IPython console') def get_plugin_icon(self): """Return widget icon""" return ima.icon('ipython_console') def get_focus_widget(self): """ Return the widget to give focus to when this plugin's dockwidget is raised on top-level """ client = self.tabwidget.currentWidget() if client is not None: return client.get_control() def closing_plugin(self, cancelable=False): """Perform actions before parent main window is closed""" self.mainwindow_close = True for client in self.clients: client.shutdown() client.remove_stderr_file() client.dialog_manager.close_all() client.close() return True def refresh_plugin(self): """Refresh tabwidget""" client = None if self.tabwidget.count(): client = self.tabwidget.currentWidget() # Decide what to show for each client if client.info_page != client.blank_page: # Show info_page if it has content client.set_info_page() client.shellwidget.hide() client.layout.addWidget(self.infowidget) self.infowidget.show() else: self.infowidget.hide() client.shellwidget.show() # Give focus to the control widget of the selected tab control = client.get_control() control.setFocus() if isinstance(control, PageControlWidget): self.pager_label.show() else: self.pager_label.hide() # Create corner widgets buttons = [[b, -7] for b in client.get_toolbar_buttons()] buttons = sum(buttons, [])[:-1] widgets = [client.create_time_label()] + buttons else: control = None widgets = [] self.find_widget.set_editor(control) self.tabwidget.set_corner_widgets({Qt.TopRightCorner: widgets}) if client: sw = client.shellwidget self.main.variableexplorer.set_shellwidget(sw) self.sig_pdb_state_changed.emit( sw.is_waiting_pdb_input(), sw.get_pdb_last_step()) self.sig_shellwidget_changed.emit(sw) self.update_tabs_text() self.sig_update_plugin_title.emit() def get_plugin_actions(self): """Return a list of actions related to plugin.""" create_client_action = create_action( self, _("New console (default settings)"), icon=ima.icon('ipython_console'), triggered=self.create_new_client, context=Qt.WidgetWithChildrenShortcut) self.register_shortcut(create_client_action, context="ipython_console", name="New tab") create_pylab_action = create_action( self, _("New Pylab console (data plotting)"), icon=ima.icon('ipython_console'), triggered=self.create_pylab_client, context=Qt.WidgetWithChildrenShortcut) create_sympy_action = create_action( self, _("New SymPy console (symbolic math)"), icon=ima.icon('ipython_console'), triggered=self.create_sympy_client, context=Qt.WidgetWithChildrenShortcut) create_cython_action = create_action( self, _("New Cython console (Python with " "C extensions)"), icon=ima.icon('ipython_console'), triggered=self.create_cython_client, context=Qt.WidgetWithChildrenShortcut) special_console_action_group = QActionGroup(self) special_console_actions = (create_pylab_action, create_sympy_action, create_cython_action) add_actions(special_console_action_group, special_console_actions) special_console_menu = QMenu(_("New special console"), self) add_actions(special_console_menu, special_console_actions) restart_action = create_action(self, _("Restart kernel"), icon=ima.icon('restart'), triggered=self.restart_kernel, context=Qt.WidgetWithChildrenShortcut) reset_action = create_action(self, _("Remove all variables"), icon=ima.icon('editdelete'), triggered=self.reset_kernel, context=Qt.WidgetWithChildrenShortcut) self.register_shortcut(reset_action, context="ipython_console", name="Reset namespace") if self.interrupt_action is None: self.interrupt_action = create_action( self, _("Interrupt kernel"), icon=ima.icon('stop'), triggered=self.interrupt_kernel, context=Qt.WidgetWithChildrenShortcut) self.register_shortcut(restart_action, context="ipython_console", name="Restart kernel") connect_to_kernel_action = create_action(self, _("Connect to an existing kernel"), None, None, _("Open a new IPython console connected to an existing kernel"), triggered=self.create_client_for_kernel) rename_tab_action = create_action(self, _("Rename tab"), icon=ima.icon('rename'), triggered=self.tab_name_editor) # Add actions to main menus if self.add_actions_to_main_menus: console_menu = self.main.mainmenu.get_application_menu("consoles_menu") console_menu.aboutToShow.connect(self.update_execution_state_kernel) new_consoles_actions = [ create_client_action, special_console_menu, connect_to_kernel_action] restart_connect_consoles_actions = [ self.interrupt_action, restart_action, reset_action] for console_new_action in new_consoles_actions: self.main.mainmenu.add_item_to_application_menu( console_new_action, menu_id=ApplicationMenus.Consoles, section=ConsolesMenuSections.New, omit_id=True) for console_restart_connect_action in restart_connect_consoles_actions: self.main.mainmenu.add_item_to_application_menu( console_restart_connect_action, menu_id=ApplicationMenus.Consoles, section=ConsolesMenuSections.Restart, omit_id=True) # IPython documentation self.ipython_menu = SpyderMenu( parent=self, title=_("IPython documentation")) intro_action = create_action( self, _("Intro to IPython"), triggered=self.show_intro) quickref_action = create_action( self, _("Quick reference"), triggered=self.show_quickref) guiref_action = create_action( self, _("Console help"), triggered=self.show_guiref) add_actions( self.ipython_menu, (intro_action, guiref_action, quickref_action)) self.main.mainmenu.add_item_to_application_menu( self.ipython_menu, menu_id=ApplicationMenus.Help, section=HelpMenuSections.ExternalDocumentation, before_section=HelpMenuSections.About, omit_id=True) self.add_actions_to_main_menus = False # Plugin actions self.menu_actions = [create_client_action, special_console_menu, connect_to_kernel_action, MENU_SEPARATOR, self.interrupt_action, restart_action, reset_action, rename_tab_action] self.update_execution_state_kernel() # Check for a current client. Since it manages more actions. client = self.get_current_client() if client: return client.get_options_menu() return self.menu_actions def register_plugin(self): """Register plugin in Spyder's main window""" self.add_dockwidget() self.sig_focus_changed.connect(self.main.plugin_focus_changed) if self.main.editor: self.sig_edit_goto_requested.connect(self.main.editor.load) self.sig_edit_goto_requested[str, int, str, bool].connect( lambda fname, lineno, word, processevents: self.main.editor.load( fname, lineno, word, processevents=processevents)) self.main.editor.breakpoints_saved.connect( self.set_spyder_breakpoints) self.main.editor.run_in_current_ipyclient.connect(self.run_script) self.main.editor.run_cell_in_ipyclient.connect(self.run_cell) self.main.editor.debug_cell_in_ipyclient.connect(self.debug_cell) # Connect Editor debug action with Console self.sig_pdb_state_changed.connect( self.main.editor.update_pdb_state) self.main.editor.exec_in_extconsole.connect( self.execute_code_and_focus_editor) self.tabwidget.currentChanged.connect(self.update_working_directory) self.tabwidget.currentChanged.connect(self.check_pdb_state) self._remove_old_stderr_files() # Update kernels if python path is changed self.main.sig_pythonpath_changed.connect(self.update_path) # Show history file if no console is visible if not self._isvisible and self.main.historylog: self.main.historylog.add_history(get_conf_path('history.py')) #------ Public API (for clients) ------------------------------------------ def get_clients(self): """Return clients list""" return [cl for cl in self.clients if isinstance(cl, ClientWidget)] def get_focus_client(self): """Return current client with focus, if any""" widget = QApplication.focusWidget() for client in self.get_clients(): if widget is client or widget is client.get_control(): return client def get_current_client(self): """Return the currently selected client""" client = self.tabwidget.currentWidget() if client is not None: return client def get_current_shellwidget(self): """Return the shellwidget of the current client""" client = self.get_current_client() if client is not None: return client.shellwidget def run_script(self, filename, wdir, args, debug, post_mortem, current_client, clear_variables, console_namespace): """Run script in current or dedicated client""" norm = lambda text: remove_backslashes(to_text_string(text)) # Run Cython files in a dedicated console is_cython = osp.splitext(filename)[1] == '.pyx' if is_cython: current_client = False # Select client to execute code on it is_new_client = False if current_client: client = self.get_current_client() else: client = self.get_client_for_file(filename) if client is None: self.create_client_for_file(filename, is_cython=is_cython) client = self.get_current_client() is_new_client = True if client is not None: # If spyder-kernels, use runfile if client.shellwidget.is_spyder_kernel(): line = "%s('%s'" % ('debugfile' if debug else 'runfile', norm(filename)) if args: line += ", args='%s'" % norm(args) if wdir: line += ", wdir='%s'" % norm(wdir) if post_mortem: line += ", post_mortem=True" if console_namespace: line += ", current_namespace=True" line += ")" else: # External, non spyder-kernels, use %run line = "%run " if debug: line += "-d " line += "\"%s\"" % to_text_string(filename) if args: line += " %s" % norm(args) try: if client.shellwidget._executing: # Don't allow multiple executions when there's # still an execution taking place # Fixes spyder-ide/spyder#7293. pass elif current_client: self.execute_code(line, current_client, clear_variables) else: if is_new_client: client.shellwidget.silent_execute('%clear') else: client.shellwidget.execute('%clear') client.shellwidget.sig_prompt_ready.connect( lambda: self.execute_code(line, current_client, clear_variables)) except AttributeError: pass self.switch_to_plugin() else: #XXX: not sure it can really happen QMessageBox.warning(self, _('Warning'), _("No IPython console is currently available to run %s." "

Please open a new one and try again." ) % osp.basename(filename), QMessageBox.Ok) def run_cell(self, code, cell_name, filename, run_cell_copy, function='runcell'): """Run cell in current or dedicated client.""" def norm(text): return remove_backslashes(to_text_string(text)) self.run_cell_filename = filename # Select client to execute code on it client = self.get_client_for_file(filename) if client is None: client = self.get_current_client() if client is not None: # Internal kernels, use runcell if client.get_kernel() is not None and not run_cell_copy: line = (to_text_string( "{}({}, '{}')").format( to_text_string(function), repr(cell_name), norm(filename).replace("'", r"\'"))) # External kernels and run_cell_copy, just execute the code else: line = code.strip() try: self.execute_code(line) except AttributeError: pass self._visibility_changed(True) self.raise_() else: # XXX: not sure it can really happen QMessageBox.warning(self, _('Warning'), _("No IPython console is currently available " "to run {}.

Please open a new " "one and try again." ).format(osp.basename(filename)), QMessageBox.Ok) def debug_cell(self, code, cell_name, filename, run_cell_copy): """Debug current cell.""" self.run_cell(code, cell_name, filename, run_cell_copy, 'debugcell') def set_current_client_working_directory(self, directory): """Set current client working directory.""" shellwidget = self.get_current_shellwidget() if shellwidget is not None: shellwidget.set_cwd(directory) def set_working_directory(self, dirname): """Set current working directory. In the workingdirectory and explorer plugins. """ if osp.isdir(dirname): self.sig_current_directory_changed.emit(dirname) def update_working_directory(self): """Update working directory to console cwd.""" shellwidget = self.get_current_shellwidget() if shellwidget is not None: shellwidget.update_cwd() def update_path(self, path_dict, new_path_dict): """Update path on consoles.""" for client in self.get_clients(): shell = client.shellwidget if shell is not None: self.main.get_spyder_pythonpath() shell.update_syspath(path_dict, new_path_dict) def execute_code_and_focus_editor(self, lines, focus_to_editor=True): """ Execute lines in IPython console and eventually set focus to the Editor. """ console = self console.switch_to_plugin() console.execute_code(lines) if focus_to_editor and self.main.editor: self.main.editor.switch_to_plugin() def execute_code(self, lines, current_client=True, clear_variables=False): """Execute code instructions.""" sw = self.get_current_shellwidget() if sw is not None: if not current_client: # Clear console and reset namespace for # dedicated clients. # See spyder-ide/spyder#5748. try: sw.sig_prompt_ready.disconnect() except TypeError: pass sw.reset_namespace(warning=False) elif current_client and clear_variables: sw.reset_namespace(warning=False) # Needed to handle an error when kernel_client is none. # See spyder-ide/spyder#6308. try: sw.execute(to_text_string(lines)) except AttributeError: pass self.activateWindow() self.get_current_client().get_control().setFocus() def pdb_execute_command(self, command): """ Send command to the pdb kernel if possible. """ sw = self.get_current_shellwidget() if sw is not None: # Needed to handle an error when kernel_client is None. # See spyder-ide/spyder#7578. try: sw.pdb_execute_command(command) except AttributeError: pass def stop_debugging(self): """Stop debugging""" sw = self.get_current_shellwidget() if sw is not None: sw.stop_debugging() def get_pdb_state(self): """Get debugging state of the current console.""" sw = self.get_current_shellwidget() if sw is not None: return sw.is_waiting_pdb_input() return False def get_pdb_last_step(self): """Get last pdb step of the current console.""" sw = self.get_current_shellwidget() if sw is not None: return sw.get_pdb_last_step() return {} def check_pdb_state(self): """ Check if actions need to be taken checking the last pdb state. """ pdb_state = self.get_pdb_state() if pdb_state: pdb_last_step = self.get_pdb_last_step() sw = self.get_current_shellwidget() if 'fname' in pdb_last_step and sw is not None: fname = pdb_last_step['fname'] line = pdb_last_step['lineno'] self.pdb_has_stopped(fname, line, sw) @Slot() @Slot(bool) @Slot(str) @Slot(bool, str) @Slot(bool, bool) @Slot(bool, str, bool) def create_new_client(self, give_focus=True, filename='', is_cython=False, is_pylab=False, is_sympy=False, given_name=None): """Create a new client""" self.master_clients += 1 client_id = dict(int_id=to_text_string(self.master_clients), str_id='A') cf = self._new_connection_file() show_elapsed_time = self.get_option('show_elapsed_time') reset_warning = self.get_option('show_reset_namespace_warning') ask_before_restart = self.get_option('ask_before_restart') ask_before_closing = self.get_option('ask_before_closing') client = ClientWidget(self, id_=client_id, history_filename=get_conf_path('history.py'), config_options=self.config_options(), additional_options=self.additional_options( is_pylab=is_pylab, is_sympy=is_sympy), interpreter_versions=self.interpreter_versions(), connection_file=cf, menu_actions=self.menu_actions, options_button=self.options_button, show_elapsed_time=show_elapsed_time, reset_warning=reset_warning, given_name=given_name, ask_before_restart=ask_before_restart, ask_before_closing=ask_before_closing, css_path=self.css_path) # Change stderr_dir if requested if self.test_dir is not None: client.stderr_dir = self.test_dir self.add_tab(client, name=client.get_name(), filename=filename) if cf is None: error_msg = self.permission_error_msg.format(jupyter_runtime_dir()) client.show_kernel_error(error_msg) return # Check if ipykernel is present in the external interpreter. # Else we won't be able to create a client if not CONF.get('main_interpreter', 'default'): pyexec = CONF.get('main_interpreter', 'executable') has_spyder_kernels = programs.is_module_installed( 'spyder_kernels', interpreter=pyexec, version='>=2.1.0;<2.2.0') if not has_spyder_kernels and not running_under_pytest(): client.show_kernel_error( _("Your Python environment or installation doesn't have " "the spyder-kernels module or the right " "version of it installed (>= 2.1.0 and < 2.2.0). " "Without this module is not possible for Spyder to " "create a console for you.

" "You can install it by running in a system terminal:" "

" "conda install spyder-kernels=2.1" "

or

" "pip install spyder-kernels==2.1.*") ) return self.connect_client_to_kernel(client, is_cython=is_cython, is_pylab=is_pylab, is_sympy=is_sympy) if client.shellwidget.kernel_manager is None: return self.register_client(client, give_focus=give_focus) def create_pylab_client(self): """Force creation of Pylab client""" self.create_new_client(is_pylab=True, given_name="Pylab") def create_sympy_client(self): """Force creation of SymPy client""" self.create_new_client(is_sympy=True, given_name="SymPy") def create_cython_client(self): """Force creation of Cython client""" self.create_new_client(is_cython=True, given_name="Cython") @Slot() def create_client_for_kernel(self): """Create a client connected to an existing kernel""" connect_output = KernelConnectionDialog.get_connection_parameters(self) (connection_file, hostname, sshkey, password, ok) = connect_output if not ok: return else: self._create_client_for_kernel(connection_file, hostname, sshkey, password) def connect_client_to_kernel(self, client, is_cython=False, is_pylab=False, is_sympy=False): """Connect a client to its kernel""" connection_file = client.connection_file stderr_handle = None if self.test_no_stderr else client.stderr_handle km, kc = self.create_kernel_manager_and_kernel_client( connection_file, stderr_handle, is_cython=is_cython, is_pylab=is_pylab, is_sympy=is_sympy) # An error occurred if this is True if is_string(km) and kc is None: client.shellwidget.kernel_manager = None client.show_kernel_error(km) return # This avoids a recurrent, spurious NameError when running our # tests in our CIs if not self.testing: kc.started_channels.connect( lambda c=client: self.shellwidget_started(c)) kc.stopped_channels.connect( lambda c=client: self.shellwidget_deleted(c)) kc.start_channels(shell=True, iopub=True) shellwidget = client.shellwidget shellwidget.set_kernel_client_and_manager(kc, km) shellwidget.sig_exception_occurred.connect( self.sig_exception_occurred) @Slot(object, object) def edit_file(self, filename, line): """Handle %edit magic petitions.""" if encoding.is_text_file(filename): # The default line number sent by ipykernel is always the last # one, but we prefer to use the first. self.sig_edit_goto_requested.emit(filename, 1, '') def config_options(self): """ Generate a Trailets Config instance for shell widgets using our config system This lets us create each widget with its own config """ # ---- Jupyter config ---- try: full_cfg = load_pyconfig_files(['jupyter_qtconsole_config.py'], jupyter_config_dir()) # From the full config we only select the JupyterWidget section # because the others have no effect here. cfg = Config({'JupyterWidget': full_cfg.JupyterWidget}) except: cfg = Config() # ---- Spyder config ---- spy_cfg = Config() # Make the pager widget a rich one (i.e a QTextEdit) spy_cfg.JupyterWidget.kind = 'rich' # Gui completion widget completion_type_o = self.get_option('completion_type') completions = {0: "droplist", 1: "ncurses", 2: "plain"} spy_cfg.JupyterWidget.gui_completion = completions[completion_type_o] # Calltips calltips_o = self.get_option('show_calltips') spy_cfg.JupyterWidget.enable_calltips = calltips_o # Buffer size buffer_size_o = self.get_option('buffer_size') spy_cfg.JupyterWidget.buffer_size = buffer_size_o # Prompts in_prompt_o = self.get_option('in_prompt') out_prompt_o = self.get_option('out_prompt') if in_prompt_o: spy_cfg.JupyterWidget.in_prompt = in_prompt_o if out_prompt_o: spy_cfg.JupyterWidget.out_prompt = out_prompt_o # Style color_scheme = CONF.get('appearance', 'selected') style_sheet = create_qss_style(color_scheme)[0] spy_cfg.JupyterWidget.style_sheet = style_sheet spy_cfg.JupyterWidget.syntax_style = color_scheme # Merge QtConsole and Spyder configs. Spyder prefs will have # prevalence over QtConsole ones cfg._merge(spy_cfg) return cfg def interpreter_versions(self): """Python and IPython versions used by clients""" if CONF.get('main_interpreter', 'default'): from IPython.core import release versions = dict( python_version = sys.version, ipython_version = release.version ) else: import subprocess versions = {} pyexec = CONF.get('main_interpreter', 'executable') py_cmd = u'%s -c "import sys; print(sys.version)"' % pyexec ipy_cmd = ( u'%s -c "import IPython.core.release as r; print(r.version)"' % pyexec ) for cmd in [py_cmd, ipy_cmd]: if PY2: # We need to encode as run_shell_command will treat the # string as str cmd = cmd.encode('utf-8') try: # Use clean environment proc = programs.run_shell_command(cmd, env={}) output, _err = proc.communicate() except subprocess.CalledProcessError: output = '' output = output.decode().split('\n')[0].strip() if 'IPython' in cmd: versions['ipython_version'] = output else: versions['python_version'] = output return versions def additional_options(self, is_pylab=False, is_sympy=False): """ Additional options for shell widgets that are not defined in JupyterWidget config options """ options = dict( pylab=self.get_option('pylab'), autoload_pylab=self.get_option('pylab/autoload'), sympy=self.get_option('symbolic_math'), show_banner=self.get_option('show_banner') ) if is_pylab is True: options['autoload_pylab'] = True options['sympy'] = False if is_sympy is True: options['autoload_pylab'] = False options['sympy'] = True return options def register_client(self, client, give_focus=True): """Register new client""" client.configure_shellwidget(give_focus=give_focus) # Local vars shellwidget = client.shellwidget control = shellwidget._control # Create new clients with Ctrl+T shortcut shellwidget.new_client.connect(self.create_new_client) # For tracebacks control.sig_go_to_error_requested.connect(self.go_to_error) # For help requests control.sig_help_requested.connect(self.sig_help_requested) shellwidget.sig_pdb_step.connect( lambda fname, lineno, shellwidget=shellwidget: self.pdb_has_stopped(fname, lineno, shellwidget)) shellwidget.sig_pdb_state_changed.connect(self.sig_pdb_state_changed) # To handle %edit magic petitions shellwidget.custom_edit_requested.connect(self.edit_file) # Set shell cwd according to preferences cwd_path = '' if CONF.get('workingdir', 'console/use_project_or_home_directory'): cwd_path = get_home_dir() if (self.main.projects is not None and self.main.projects.get_active_project() is not None): cwd_path = self.main.projects.get_active_project_path() elif CONF.get('workingdir', 'startup/use_fixed_directory'): cwd_path = CONF.get('workingdir', 'startup/fixed_directory', default=get_home_dir()) elif CONF.get('workingdir', 'console/use_fixed_directory'): cwd_path = CONF.get('workingdir', 'console/fixed_directory') if osp.isdir(cwd_path) and self.main is not None: shellwidget.set_cwd(cwd_path) if give_focus: # Syncronice cwd with explorer and cwd widget shellwidget.update_cwd() # Connect client to our history log if self.main.historylog is not None: self.main.historylog.add_history(client.history_filename) client.sig_append_to_history_requested.connect( self.main.historylog.append_to_history) # Set font for client client.set_font(self.get_font()) # Set editor for the find widget self.find_widget.set_editor(control) # Connect to working directory shellwidget.sig_working_directory_changed.connect( self.set_working_directory) def close_client(self, index=None, client=None, force=False): """Close client tab from index or widget (or close current tab)""" if not self.tabwidget.count(): return if client is not None: if client not in self.clients: # Client already closed return index = self.tabwidget.indexOf(client) # if index is not found in tabwidget it's because this client was # already closed and the call was performed by the exit callback if index == -1: return if index is None and client is None: index = self.tabwidget.currentIndex() if index is not None: client = self.tabwidget.widget(index) # Needed to handle a RuntimeError. See spyder-ide/spyder#5568. try: # Close client client.stop_button_click_handler() except RuntimeError: pass # Disconnect timer needed to update elapsed time try: client.timer.timeout.disconnect(client.show_time) except (RuntimeError, TypeError): pass # Check if related clients or kernels are opened # and eventually ask before closing them if not self.mainwindow_close and not force: close_all = True if client.ask_before_closing: close = QMessageBox.question(self, self.get_plugin_title(), _("Do you want to close this console?"), QMessageBox.Yes | QMessageBox.No) if close == QMessageBox.No: return if len(self.get_related_clients(client)) > 0: close_all = QMessageBox.question(self, self.get_plugin_title(), _("Do you want to close all other consoles connected " "to the same kernel as this one?"), QMessageBox.Yes | QMessageBox.No) client.shutdown() if close_all == QMessageBox.Yes: self.close_related_clients(client) # if there aren't related clients we can remove stderr_file related_clients = self.get_related_clients(client) if len(related_clients) == 0: client.remove_stderr_file() client.dialog_manager.close_all() client.close() # Note: client index may have changed after closing related widgets self.tabwidget.removeTab(self.tabwidget.indexOf(client)) self.clients.remove(client) # This is needed to prevent that hanged consoles make reference # to an index that doesn't exist. See spyder-ide/spyder#4881 try: self.filenames.pop(index) except IndexError: pass self.update_tabs_text() # Create a new client if the console is about to become empty if not self.tabwidget.count() and self.create_new_client_if_empty: self.create_new_client() self.sig_update_plugin_title.emit() def get_client_index_from_id(self, client_id): """Return client index from id""" for index, client in enumerate(self.clients): if id(client) == client_id: return index def get_related_clients(self, client): """ Get all other clients that are connected to the same kernel as `client` """ related_clients = [] for cl in self.get_clients(): if cl.connection_file == client.connection_file and \ cl is not client: related_clients.append(cl) return related_clients def close_related_clients(self, client): """Close all clients related to *client*, except itself""" related_clients = self.get_related_clients(client) for cl in related_clients: self.close_client(client=cl, force=True) def restart(self): """ Restart the console This is needed when we switch projects to update PYTHONPATH and the selected interpreter """ self.master_clients = 0 self.create_new_client_if_empty = False for i in range(len(self.clients)): client = self.clients[-1] try: client.shutdown() except Exception as e: QMessageBox.warning(self, _('Warning'), _("It was not possible to restart the IPython console " "when switching to this project. The error was

" "{0}").format(e), QMessageBox.Ok) self.close_client(client=client, force=True) self.create_new_client(give_focus=False) self.create_new_client_if_empty = True def pdb_has_stopped(self, fname, lineno, shellwidget): """Python debugger has just stopped at frame (fname, lineno)""" # This is a unique form of the sig_edit_goto_requested signal that # is intended to prevent keyboard input from accidentally entering the # editor during repeated, rapid entry of debugging commands. self.sig_edit_goto_requested[str, int, str, bool].emit( fname, lineno, '', False) self.activateWindow() shellwidget._control.setFocus() def set_spyder_breakpoints(self): """Set Spyder breakpoints into all clients""" for cl in self.clients: cl.shellwidget.set_spyder_breakpoints() @Slot(str) def create_client_from_path(self, path): """Create a client with its cwd pointing to path.""" self.create_new_client() sw = self.get_current_shellwidget() sw.set_cwd(path) def create_client_for_file(self, filename, is_cython=False): """Create a client to execute code related to a file.""" # Create client self.create_new_client(filename=filename, is_cython=is_cython) # Don't increase the count of master clients self.master_clients -= 1 # Rename client tab with filename client = self.get_current_client() client.allow_rename = False tab_text = self.disambiguate_fname(filename) self.rename_client_tab(client, tab_text) def get_client_for_file(self, filename): """Get client associated with a given file.""" client = None for idx, cl in enumerate(self.get_clients()): if self.filenames[idx] == filename: self.tabwidget.setCurrentIndex(idx) client = cl break return client def set_elapsed_time(self, client): """Set elapsed time for slave clients.""" related_clients = self.get_related_clients(client) for cl in related_clients: if cl.timer is not None: client.create_time_label() client.t0 = cl.t0 client.timer.timeout.connect(client.show_time) client.timer.start(1000) break def set_infowidget_font(self): """Set font for infowidget""" font = get_font(option='rich_font') self.infowidget.set_font(font) #------ Public API (for kernels) ------------------------------------------ def ssh_tunnel(self, *args, **kwargs): if os.name == 'nt': return zmqtunnel.paramiko_tunnel(*args, **kwargs) else: return openssh_tunnel(self, *args, **kwargs) def tunnel_to_kernel(self, connection_info, hostname, sshkey=None, password=None, timeout=10): """ Tunnel connections to a kernel via ssh. Remote ports are specified in the connection info ci. """ lports = zmqtunnel.select_random_ports(4) rports = (connection_info['shell_port'], connection_info['iopub_port'], connection_info['stdin_port'], connection_info['hb_port']) remote_ip = connection_info['ip'] for lp, rp in zip(lports, rports): self.ssh_tunnel(lp, rp, hostname, remote_ip, sshkey, password, timeout) return tuple(lports) def create_kernel_spec(self, is_cython=False, is_pylab=False, is_sympy=False): """Create a kernel spec for our own kernels""" # Before creating our kernel spec, we always need to # set this value in spyder.ini CONF.set('main', 'spyder_pythonpath', self.main.get_spyder_pythonpath()) return SpyderKernelSpec(is_cython=is_cython, is_pylab=is_pylab, is_sympy=is_sympy) def create_kernel_manager_and_kernel_client(self, connection_file, stderr_handle, is_cython=False, is_pylab=False, is_sympy=False): """Create kernel manager and client.""" # Kernel spec kernel_spec = self.create_kernel_spec(is_cython=is_cython, is_pylab=is_pylab, is_sympy=is_sympy) # Kernel manager try: kernel_manager = SpyderKernelManager( connection_file=connection_file, config=None, autorestart=True, ) except Exception: error_msg = _("The error is:

" "{}").format(traceback.format_exc()) return (error_msg, None) kernel_manager._kernel_spec = kernel_spec # Catch any error generated when trying to start the kernel. # See spyder-ide/spyder#7302. try: kernel_manager.start_kernel(stderr=stderr_handle, env=kernel_spec.env) except Exception: error_msg = _("The error is:

" "{}").format(traceback.format_exc()) return (error_msg, None) # Kernel client kernel_client = kernel_manager.client() # Increase time (in seconds) to detect if a kernel is alive. # See spyder-ide/spyder#3444. kernel_client.hb_channel.time_to_dead = 25.0 return kernel_manager, kernel_client def restart_kernel(self): """Restart kernel of current client.""" client = self.get_current_client() if client is not None: self.switch_to_plugin() client.restart_kernel() def reset_kernel(self): """Reset kernel of current client.""" client = self.get_current_client() if client is not None: self.switch_to_plugin() client.reset_namespace() def interrupt_kernel(self): """Interrupt kernel of current client.""" client = self.get_current_client() if client is not None: self.switch_to_plugin() client.stop_button_click_handler() def update_execution_state_kernel(self): """Update actions following the execution state of the kernel.""" client = self.get_current_client() if client is not None: executing = client.stop_button.isEnabled() self.interrupt_action.setEnabled(executing) def connect_external_kernel(self, shellwidget): """ Connect an external kernel to the Variable Explorer, Help and Plots, but only if it is a Spyder kernel. """ sw = shellwidget kc = shellwidget.kernel_client self.sig_shellwidget_changed.emit(sw) if self.main.variableexplorer is not None: self.main.variableexplorer.add_shellwidget(sw) sw.set_namespace_view_settings() sw.refresh_namespacebrowser() kc.stopped_channels.connect(lambda : self.main.variableexplorer.remove_shellwidget(id(sw))) if self.main.plots is not None: self.main.plots.add_shellwidget(sw) kc.stopped_channels.connect(lambda : self.main.plots.remove_shellwidget(id(sw))) #------ Public API (for tabs) --------------------------------------------- def add_tab(self, widget, name, filename=''): """Add tab""" self.clients.append(widget) index = self.tabwidget.addTab(widget, name) self.filenames.insert(index, filename) self.tabwidget.setCurrentIndex(index) if self.dockwidget and not self.main.is_setting_up: self.switch_to_plugin() self.activateWindow() widget.get_control().setFocus() self.update_tabs_text() def move_tab(self, index_from, index_to): """ Move tab (tabs themselves have already been moved by the tabwidget) """ filename = self.filenames.pop(index_from) client = self.clients.pop(index_from) self.filenames.insert(index_to, filename) self.clients.insert(index_to, client) self.update_tabs_text() self.sig_update_plugin_title.emit() def disambiguate_fname(self, fname): """Generate a file name without ambiguation.""" files_path_list = [filename for filename in self.filenames if filename] return sourcecode.disambiguate_fname(files_path_list, fname) def update_tabs_text(self): """Update the text from the tabs.""" # This is needed to prevent that hanged consoles make reference # to an index that doesn't exist. See spyder-ide/spyder#4881. try: for index, fname in enumerate(self.filenames): client = self.clients[index] if fname: self.rename_client_tab(client, self.disambiguate_fname(fname)) else: self.rename_client_tab(client, None) except IndexError: pass def rename_client_tab(self, client, given_name): """Rename client's tab""" index = self.get_client_index_from_id(id(client)) if given_name is not None: client.given_name = given_name self.tabwidget.setTabText(index, client.get_name()) def rename_tabs_after_change(self, given_name): """Rename tabs after a change in name.""" client = self.get_current_client() # Prevent renames that want to assign the same name of # a previous tab repeated = False for cl in self.get_clients(): if id(client) != id(cl) and given_name == cl.given_name: repeated = True break # Rename current client tab to add str_id if client.allow_rename and not u'/' in given_name and not repeated: self.rename_client_tab(client, given_name) else: self.rename_client_tab(client, None) # Rename related clients if client.allow_rename and not u'/' in given_name and not repeated: for cl in self.get_related_clients(client): self.rename_client_tab(cl, given_name) def tab_name_editor(self): """Trigger the tab name editor.""" index = self.tabwidget.currentIndex() self.tabwidget.tabBar().tab_name_editor.edit_tab(index) #------ Public API (for help) --------------------------------------------- def go_to_error(self, text): """Go to error if relevant""" match = get_error_match(to_text_string(text)) if match: fname, lnb = match.groups() if ("= (6, 1): return import asyncio try: from asyncio import ( WindowsProactorEventLoopPolicy, WindowsSelectorEventLoopPolicy, ) except ImportError: # not affected pass else: if isinstance( asyncio.get_event_loop_policy(), WindowsProactorEventLoopPolicy): # WindowsProactorEventLoopPolicy is not compatible # with tornado 6 fallback to the pre-3.8 # default of Selector asyncio.set_event_loop_policy( WindowsSelectorEventLoopPolicy()) def _new_connection_file(self): """ Generate a new connection file Taken from jupyter_client/console_app.py Licensed under the BSD license """ # Check if jupyter_runtime_dir exists (Spyder addition) if not osp.isdir(jupyter_runtime_dir()): try: os.makedirs(jupyter_runtime_dir()) except (IOError, OSError): return None cf = '' while not cf: ident = str(uuid.uuid4()).split('-')[-1] cf = os.path.join(jupyter_runtime_dir(), 'kernel-%s.json' % ident) cf = cf if not os.path.exists(cf) else '' return cf def shellwidget_started(self, client): if self.main.variableexplorer is not None: self.main.variableexplorer.add_shellwidget(client.shellwidget) self.sig_shellwidget_created.emit(client.shellwidget) def shellwidget_deleted(self, client): if self.main.variableexplorer is not None: self.main.variableexplorer.remove_shellwidget(client.shellwidget) self.sig_shellwidget_deleted.emit(client.shellwidget) def _create_client_for_kernel(self, connection_file, hostname, sshkey, password): # Verifying if the connection file exists try: cf_path = osp.dirname(connection_file) cf_filename = osp.basename(connection_file) # To change a possible empty string to None cf_path = cf_path if cf_path else None connection_file = find_connection_file(filename=cf_filename, path=cf_path) except (IOError, UnboundLocalError): QMessageBox.critical(self, _('IPython'), _("Unable to connect to " "%s") % connection_file) return # Getting the master id that corresponds to the client # (i.e. the i in i/A) master_id = None given_name = None external_kernel = False slave_ord = ord('A') - 1 kernel_manager = None for cl in self.get_clients(): if connection_file in cl.connection_file: if cl.get_kernel() is not None: kernel_manager = cl.get_kernel() connection_file = cl.connection_file if master_id is None: master_id = cl.id_['int_id'] given_name = cl.given_name new_slave_ord = ord(cl.id_['str_id']) if new_slave_ord > slave_ord: slave_ord = new_slave_ord # If we couldn't find a client with the same connection file, # it means this is a new master client if master_id is None: self.master_clients += 1 master_id = to_text_string(self.master_clients) external_kernel = True # Set full client name client_id = dict(int_id=master_id, str_id=chr(slave_ord + 1)) # Creating the client show_elapsed_time = self.get_option('show_elapsed_time') reset_warning = self.get_option('show_reset_namespace_warning') ask_before_restart = self.get_option('ask_before_restart') client = ClientWidget(self, id_=client_id, given_name=given_name, history_filename=get_conf_path('history.py'), config_options=self.config_options(), additional_options=self.additional_options(), interpreter_versions=self.interpreter_versions(), connection_file=connection_file, menu_actions=self.menu_actions, hostname=hostname, external_kernel=external_kernel, slave=True, show_elapsed_time=show_elapsed_time, reset_warning=reset_warning, ask_before_restart=ask_before_restart, css_path=self.css_path) # Change stderr_dir if requested if self.test_dir is not None: client.stderr_dir = self.test_dir # Create kernel client kernel_client = QtKernelClient(connection_file=connection_file) # This is needed for issue spyder-ide/spyder#9304. try: kernel_client.load_connection_file() except Exception as e: QMessageBox.critical(self, _('Connection error'), _("An error occurred while trying to load " "the kernel connection file. The error " "was:\n\n") + to_text_string(e)) return if hostname is not None: try: connection_info = dict(ip = kernel_client.ip, shell_port = kernel_client.shell_port, iopub_port = kernel_client.iopub_port, stdin_port = kernel_client.stdin_port, hb_port = kernel_client.hb_port) newports = self.tunnel_to_kernel(connection_info, hostname, sshkey, password) (kernel_client.shell_port, kernel_client.iopub_port, kernel_client.stdin_port, kernel_client.hb_port) = newports # Save parameters to connect comm later kernel_client.ssh_parameters = (hostname, sshkey, password) except Exception as e: QMessageBox.critical(self, _('Connection error'), _("Could not open ssh tunnel. The " "error was:\n\n") + to_text_string(e)) return # Assign kernel manager and client to shellwidget kernel_client.start_channels() shellwidget = client.shellwidget shellwidget.set_kernel_client_and_manager( kernel_client, kernel_manager) shellwidget.sig_exception_occurred.connect( self.sig_exception_occurred) if external_kernel: shellwidget.sig_is_spykernel.connect( self.connect_external_kernel) shellwidget.check_spyder_kernel() # Set elapsed time, if possible if not external_kernel: self.set_elapsed_time(client) # Adding a new tab for the client self.add_tab(client, name=client.get_name()) # Register client self.register_client(client) def _remove_old_stderr_files(self): """ Remove stderr files left by previous Spyder instances. This is only required on Windows because we can't clean up stderr files while Spyder is running on it. """ if os.name == 'nt': tmpdir = get_temp_dir() for fname in os.listdir(tmpdir): if osp.splitext(fname)[1] == '.stderr': try: os.remove(osp.join(tmpdir, fname)) except Exception: pass def print_debug_file_msg(self): """Print message in the current console when a file can't be closed.""" debug_msg = _('The current file cannot be closed because it is ' 'in debug mode.') self.get_current_client().shellwidget.append_html_message( debug_msg, before_prompt=True) # TODO: To be updated after migration def on_mainwindow_visible(self): return