#----------------------------------------------------------------------------- # Copyright (c) 2012 - 2021, Anaconda, Inc., and Bokeh Contributors. # All rights reserved. # # The full license is in the file LICENSE.txt, distributed with this software. #----------------------------------------------------------------------------- ''' Encapulate the management of Document callbacks with a DocumentCallbackManager class. ''' #----------------------------------------------------------------------------- # Boilerplate #----------------------------------------------------------------------------- from __future__ import annotations import logging # isort:skip log = logging.getLogger(__name__) #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- # Standard library imports import weakref from collections import defaultdict from functools import wraps from typing import ( TYPE_CHECKING, Any, Callable, Dict, List, Set, Type, ) # Bokeh imports from ..core.enums import HoldPolicy, HoldPolicyType from ..core.types import Unknown from ..events import ( _CONCRETE_EVENT_CLASSES, DocumentEvent, Event, ModelEvent, ) from ..model import Model from ..util.callback_manager import _check_callback from .events import ( # RootAddedEvent,; RootRemovedEvent,; TitleChangedEvent, DocumentPatchedEvent, ModelChangedEvent, SessionCallbackAdded, SessionCallbackRemoved, ) from .locking import UnlockedDocumentProxy if TYPE_CHECKING: from ..application.application import SessionDestroyedCallback from ..core.has_props import Setter from ..events import EventJson from ..server.callbacks import SessionCallback from .document import Document from .events import DocumentChangeCallback, DocumentChangedEvent, Invoker #----------------------------------------------------------------------------- # Globals and constants #----------------------------------------------------------------------------- __all__ = ( 'DocumentCallbackManager', 'invoke_with_curdoc', ) Callback = Callable[[], None] Originator = Callable[..., Any] MessageCallback = Callable[[Unknown], None] EventCallback = Callable[[Event], None] #----------------------------------------------------------------------------- # General API #----------------------------------------------------------------------------- class DocumentCallbackManager: ''' Manage and provide access to all of the models that belong to a Bokeh Document. The set of "all models" means specifically all the models reachable from references form a Document's roots. ''' _document : weakref.ReferenceType[Document] _change_callbacks: Dict[Any, DocumentChangeCallback] _event_callbacks: Dict[str, List[EventCallback]] _message_callbacks: Dict[str, List[MessageCallback]] _session_destroyed_callbacks: Set[SessionDestroyedCallback] _session_callbacks: Set[SessionCallback] _subscribed_models: Dict[str, Set[weakref.ReferenceType[Model]]] _hold: HoldPolicyType | None = None _held_events: List[DocumentChangedEvent] def __init__(self, document: Document): ''' Args: document (Document): A Document to manage models for A weak reference to the Document will be retained ''' self._document = weakref.ref(document) self._change_callbacks = {} self._event_callbacks = defaultdict(list) self._message_callbacks = defaultdict(list) self._session_destroyed_callbacks = set() self._session_callbacks = set() self._subscribed_models = defaultdict(set) self._hold = None self._held_events = [] self.on_message("bokeh_event", self.trigger_json_event) @property def session_callbacks(self) -> List[SessionCallback]: ''' A list of all the session callbacks for this document. ''' return list(self._session_callbacks) @property def session_destroyed_callbacks(self) -> Set[SessionDestroyedCallback]: ''' A list of all the on_session_destroyed callbacks for this document. ''' return self._session_destroyed_callbacks @session_destroyed_callbacks.setter def session_destroyed_callbacks(self, callbacks: Set[SessionDestroyedCallback]) -> None: self._session_destroyed_callbacks = callbacks def add_session_callback(self, callback_obj: SessionCallback, callback: Callback, one_shot: bool) -> SessionCallback: ''' Internal implementation for adding session callbacks. Args: callback_obj (SessionCallback) : A session callback object that wraps a callable and is passed to ``trigger_on_change``. callback (callable) : A callable to execute when session events happen. one_shot (bool) : Whether the callback should immediately auto-remove itself after one execution. Returns: SessionCallback : passed in as ``callback_obj``. Raises: ValueError, if the callback has been previously added ''' doc = self._document() if doc is None: raise RuntimeError("Attempting to add session callback to already-destroyed Document") if one_shot: @wraps(callback) def remove_then_invoke() -> None: if callback_obj in self._session_callbacks: self.remove_session_callback(callback_obj) return callback() actual_callback = remove_then_invoke else: actual_callback = callback callback_obj._callback = _wrap_with_curdoc(doc, actual_callback) self._session_callbacks.add(callback_obj) # emit event so the session is notified of the new callback self.trigger_on_change(SessionCallbackAdded(doc, callback_obj)) return callback_obj def destroy(self) -> None: ''' Clean up references to the Documents models ''' self._change_callbacks.clear() del self._change_callbacks self._event_callbacks.clear() del self._event_callbacks self._message_callbacks.clear() del self._message_callbacks def hold(self, policy: HoldPolicyType = "combine") -> None: if self._hold is not None and self._hold != policy: log.warning(f"hold already active with '{self._hold}', ignoring '{policy}'") return if policy not in HoldPolicy: raise ValueError(f"Unknown hold policy {policy}") self._hold = policy @property def hold_value(self) -> HoldPolicyType | None: return self._hold def notify_change(self, model: Model, attr: str, old: Unknown, new: Unknown, hint: DocumentPatchedEvent | None = None, setter: Setter | None = None, callback_invoker: Invoker | None = None) -> None: ''' Called by Model when it changes ''' doc = self._document() if doc is None: return # if name changes, need to update by-name index if attr == 'name': doc.models.update_name(model, old, new) if hint is None: serializable_new = model.lookup(attr).serializable_value(model) else: serializable_new = None event = ModelChangedEvent(doc, model, attr, old, new, serializable_new, hint, setter, callback_invoker) self.trigger_on_change(event) def notify_event(self, model: Model, event: ModelEvent, callback_invoker: Invoker) -> None: ''' ''' doc = self._document() if doc is None: return # TODO (bev): use internal event here to dispatch, rather than hard-coding invocation here invoke_with_curdoc(doc, callback_invoker) def on_change(self, *callbacks: DocumentChangeCallback) -> None: ''' Provide callbacks to invoke if the document or any Model reachable from its roots changes. ''' for callback in callbacks: if callback in self._change_callbacks: continue _check_callback(callback, ('event',)) self._change_callbacks[callback] = callback def on_change_dispatch_to(self, receiver: Any) -> None: if not receiver in self._change_callbacks: self._change_callbacks[receiver] = lambda event: event.dispatch(receiver) def on_event(self, event: str | Type[Event], *callbacks: EventCallback) -> None: ''' Provide callbacks to invoke if a bokeh event is received. ''' if not isinstance(event, str) and issubclass(event, Event): event = event.event_name if event not in _CONCRETE_EVENT_CLASSES: raise ValueError(f"Unknown event {event}") if not issubclass(_CONCRETE_EVENT_CLASSES[event], DocumentEvent): raise ValueError("Document.on_event may only be used to subscribe " "to events of type DocumentEvent. To subscribe " "to a ModelEvent use the Model.on_event method.") for callback in callbacks: _check_callback(callback, ('event',), what='Event callback') self._event_callbacks[event].extend(callbacks) def on_message(self, msg_type: str, *callbacks: MessageCallback) -> None: self._message_callbacks[msg_type].extend(callbacks) def on_session_destroyed(self, *callbacks: SessionDestroyedCallback) -> None: ''' Provide callbacks to invoke when the session serving the Document is destroyed ''' for callback in callbacks: _check_callback(callback, ('session_context',)) self._session_destroyed_callbacks.add(callback) def remove_on_change(self, *callbacks: Any) -> None: ''' Remove a callback added earlier with ``on_change``. Raises: KeyError, if the callback was never added ''' for callback in callbacks: del self._change_callbacks[callback] def remove_on_message(self, msg_type: str, callback: MessageCallback) -> None: ''' ''' message_callbacks = self._message_callbacks.get(msg_type, None) if message_callbacks is not None and callback in message_callbacks: message_callbacks.remove(callback) def remove_session_callback(self, callback_obj: SessionCallback) -> None: ''' Remove a callback added earlier with ``add_periodic_callback``, ``add_timeout_callback``, or ``add_next_tick_callback``. Returns: None Raises: KeyError, if the callback was never added ''' try: callback_objs = [callback_obj] self._session_callbacks.remove(callback_obj) except KeyError: raise ValueError("callback already ran or was already removed, cannot be removed again") doc = self._document() if doc is None: return # emit event so the session is notified and can remove the callback for callback_obj in callback_objs: self.trigger_on_change(SessionCallbackRemoved(doc, callback_obj)) def subscribe(self, key: str, model: Model) -> None: self._subscribed_models[key].add(weakref.ref(model)) def trigger_json_event(self, json: EventJson) -> None: try: event = Event.decode_json(json) except ValueError: log.warning('Could not decode event json: %s' % json) # This is fairly gorpy, we are not being careful with model vs doc events, etc. if isinstance(event, ModelEvent): subscribed = self._subscribed_models[event.event_name].copy() for model_ref in subscribed: model = model_ref() if model: model._trigger_event(event) for cb in self._event_callbacks.get(event.event_name, []): cb(event) def trigger_on_change(self, event: DocumentChangedEvent) -> None: doc = self._document() if doc is None: return if self._hold == "collect": self._held_events.append(event) return elif self._hold == "combine": _combine_document_events(event, self._held_events) return if event.callback_invoker is not None: invoke_with_curdoc(doc, event.callback_invoker) def invoke_callbacks() -> None: for cb in self._change_callbacks.values(): cb(event) invoke_with_curdoc(doc, invoke_callbacks) def unhold(self) -> None: ''' Turn off any active document hold and apply any collected events. Returns: None ''' # no-op if we are already no holding if self._hold is None: return self._hold = None events = list(self._held_events) self._held_events = [] for event in events: self.trigger_on_change(event) #----------------------------------------------------------------------------- # Dev API #----------------------------------------------------------------------------- def invoke_with_curdoc(doc: Document, f: Callable[[], None]) -> None: from ..io.doc import patch_curdoc curdoc: Document|UnlockedDocumentProxy = UnlockedDocumentProxy(doc) if getattr(f, "nolock", False) else doc with patch_curdoc(curdoc): return f() #----------------------------------------------------------------------------- # Private API #----------------------------------------------------------------------------- def _combine_document_events(new_event: DocumentChangedEvent, old_events: List[DocumentChangedEvent]) -> None: ''' Attempt to combine a new event with a list of previous events. The ``old_event`` will be scanned in reverse, and ``.combine(new_event)`` will be called on each. If a combination can be made, the function will return immediately. Otherwise, ``new_event`` will be appended to ``old_events``. Args: new_event (DocumentChangedEvent) : The new event to attempt to combine old_events (list[DocumentChangedEvent]) A list of previous events to attempt to combine new_event with **This is an "out" parameter**. The values it contains will be modified in-place. Returns: None ''' for event in reversed(old_events): if event.combine(new_event): return # no combination was possible old_events.append(new_event) def _wrap_with_curdoc(doc: Document, f: Callable[..., Any]) -> Callable[..., Any]: @wraps(f) def wrapper(*args: Any, **kwargs: Any) -> None: @wraps(f) def invoke() -> Any: return f(*args, **kwargs) return invoke_with_curdoc(doc, invoke) return wrapper #----------------------------------------------------------------------------- # Code #-----------------------------------------------------------------------------