""" Declares Syncable and Reactive classes which provides baseclasses for Panel components which sync their state with one or more bokeh models rendered on the frontend. """ import difflib import datetime as dt import logging import re import sys import textwrap from collections import Counter, defaultdict, namedtuple from functools import partial import bleach import numpy as np import param from bokeh.model import DataModel from param.parameterized import ParameterizedMetaclass, Watcher from pprint import pformat from .io.document import unlocked from .io.model import hold from .io.notebook import push from .io.state import set_curdoc, state from .models.reactive_html import ( ReactiveHTML as _BkReactiveHTML, ReactiveHTMLParser ) from .util import edit_readonly, escape, updating from .viewable import Layoutable, Renderable, Viewable log = logging.getLogger('panel.reactive') LinkWatcher = namedtuple("Watcher", Watcher._fields+('target', 'links', 'transformed', 'bidirectional_watcher')) class Syncable(Renderable): """ Syncable is an extension of the Renderable object which can not only render to a bokeh model but also sync the parameters on the object with the properties on the model. In order to bi-directionally link parameters with bokeh model instances the _link_params and _link_props methods define callbacks triggered when either the parameter or bokeh property values change. Since there may not be a 1-to-1 mapping between parameter and the model property the _process_property_change and _process_param_change may be overridden to apply any necessary transformations. """ # Timeout if a notebook comm message is swallowed _timeout = 20000 # Timeout before the first event is processed _debounce = 50 # Property changes which should not be debounced _priority_changes = [] # Any parameters that require manual updates handling for the models # e.g. parameters which affect some sub-model _manual_params = [] # Mapping from parameter name to bokeh model property name _rename = {} # Allows defining a mapping from model property name to a JS code # snippet that transforms the object before serialization _js_transforms = {} # Transforms from input value to bokeh property value _source_transforms = {} _target_transforms = {} __abstract = True def __init__(self, **params): super().__init__(**params) # Useful when updating model properties which trigger potentially # recursive events self._updating = False # A dictionary of current property change events self._events = {} # Any watchers associated with links between two objects self._links = [] self._link_params() # A dictionary of bokeh property changes being processed self._changing = {} # Sets up watchers to process manual updates to models if self._manual_params: self.param.watch(self._update_manual, self._manual_params) #---------------------------------------------------------------- # Model API #---------------------------------------------------------------- def _process_property_change(self, msg): """ Transform bokeh model property changes into parameter updates. Should be overridden to provide appropriate mapping between parameter value and bokeh model change. By default uses the _rename class level attribute to map between parameter and property names. """ inverted = {v: k for k, v in self._rename.items()} return {inverted.get(k, k): v for k, v in msg.items()} def _process_param_change(self, msg): """ Transform parameter changes into bokeh model property updates. Should be overridden to provide appropriate mapping between parameter value and bokeh model change. By default uses the _rename class level attribute to map between parameter and property names. """ properties = {self._rename.get(k, k): v for k, v in msg.items() if self._rename.get(k, False) is not None} if 'width' in properties and self.sizing_mode is None: properties['min_width'] = properties['width'] if 'height' in properties and self.sizing_mode is None: properties['min_height'] = properties['height'] return properties @property def _linkable_params(self): """ Parameters that can be linked in JavaScript via source transforms. """ return [p for p in self._synced_params if self._rename.get(p, False) is not None and self._source_transforms.get(p, False) is not None] + ['loading'] @property def _synced_params(self): """ Parameters which are synced with properties using transforms applied in the _process_param_change method. """ ignored = ['default_layout', 'loading'] return [p for p in self.param if p not in self._manual_params+ignored] def _init_params(self): return {k: v for k, v in self.param.values().items() if k in self._synced_params and v is not None} def _link_params(self): params = self._synced_params if params: watcher = self.param.watch(self._param_change, params) self._callbacks.append(watcher) def _link_props(self, model, properties, doc, root, comm=None): from .config import config ref = root.ref['id'] if config.embed: return for p in properties: if isinstance(p, tuple): _, p = p m = model if '.' in p: *subpath, p = p.split('.') for sp in subpath: m = getattr(m, sp) else: subpath = None if comm: m.on_change(p, partial(self._comm_change, doc, ref, comm, subpath)) else: m.on_change(p, partial(self._server_change, doc, ref, subpath)) def _manual_update(self, events, model, doc, root, parent, comm): """ Method for handling any manual update events, i.e. events triggered by changes in the manual params. """ def _update_manual(self, *events): for ref, (model, parent) in self._models.items(): if ref not in state._views or ref in state._fake_roots: continue viewable, root, doc, comm = state._views[ref] if comm or state._unblocked(doc): with unlocked(): self._manual_update(events, model, doc, root, parent, comm) if comm and 'embedded' not in root.tags: push(doc, comm) else: cb = partial(self._manual_update, events, model, doc, root, parent, comm) if doc.session_context: doc.add_next_tick_callback(cb) else: cb() def _apply_update(self, events, msg, model, ref): if ref not in state._views or ref in state._fake_roots: return viewable, root, doc, comm = state._views[ref] if comm or not doc.session_context or state._unblocked(doc): with unlocked(): self._update_model(events, msg, root, model, doc, comm) if comm and 'embedded' not in root.tags: push(doc, comm) else: cb = partial(self._update_model, events, msg, root, model, doc, comm) doc.add_next_tick_callback(cb) def _update_model(self, events, msg, root, model, doc, comm): ref = root.ref['id'] self._changing[ref] = attrs = [ attr for attr, value in msg.items() if not model.lookup(attr).property.matches(getattr(model, attr), value) ] try: model.update(**msg) finally: changing = [ attr for attr in self._changing.get(ref, []) if attr not in attrs ] if changing: self._changing[ref] = changing elif ref in self._changing: del self._changing[ref] def _cleanup(self, root): super()._cleanup(root) ref = root.ref['id'] self._models.pop(ref, None) comm, client_comm = self._comms.pop(ref, (None, None)) if comm: try: comm.close() except Exception: pass if client_comm: try: client_comm.close() except Exception: pass def _param_change(self, *events): msgs = [] for event in events: msg = self._process_param_change({event.name: event.new}) if msg: msgs.append(msg) events = {event.name: event for event in events} msg = {k: v for msg in msgs for k, v in msg.items()} if not msg: return for ref, (model, _) in self._models.copy().items(): self._apply_update(events, msg, model, ref) def _process_events(self, events): self._log('received events %s', events) busy = state.busy with edit_readonly(state): state.busy = True events = self._process_property_change(events) try: with edit_readonly(self): self_events = {k: v for k, v in events.items() if '.' not in k} self.param.update(**self_events) for k, v in self_events.items(): if '.' not in k: continue *subpath, p = k.split('.') obj = self for sp in subpath: obj = getattr(obj, sp) with edit_readonly(obj): obj.param.update(**{p: v}) except Exception: if len(events)>1: msg_end = f" changing properties {pformat(events)} \n" elif len(events)==1: msg_end = f" changing property {pformat(events)} \n" else: msg_end = "\n" log.exception(f'Callback failed for object named "{self.name}"{msg_end}') raise finally: self._log('finished processing events %s', events) with edit_readonly(state): state.busy = busy def _process_bokeh_event(self, event): self._log('received bokeh event %s', event) busy = state.busy with edit_readonly(state): state.busy = True try: self._process_event(event) finally: self._log('finished processing bokeh event %s', event) with edit_readonly(state): state.busy = busy async def _change_coroutine(self, doc=None): if state._thread_pool: state._thread_pool.submit(self._change_event, doc) else: with set_curdoc(doc): self._change_event(doc) async def _event_coroutine(self, event, doc): if state._thread_pool: state._thread_pool.submit(self._process_bokeh_event, event) else: with set_curdoc(doc): self._process_bokeh_event(event) def _change_event(self, doc=None): try: state.curdoc = doc events = self._events self._events = {} self._process_events(events) finally: state.curdoc = None def _schedule_change(self, doc, comm): with hold(doc, comm=comm): self._change_event(doc) def _comm_change(self, doc, ref, comm, subpath, attr, old, new): if subpath: attr = f'{subpath}.{attr}' if attr in self._changing.get(ref, []): self._changing[ref].remove(attr) return self._events.update({attr: new}) if state._thread_pool: state._thread_pool.submit(self._schedule_change, doc, comm) else: self._schedule_change(doc, comm) def _comm_event(self, event): if state._thread_pool: state._thread_pool.submit(self._process_bokeh_event, event) else: self._process_bokeh_event(event) def _server_event(self, doc, event): if doc.session_context and not state._unblocked(doc): doc.add_next_tick_callback( partial(self._event_coroutine, event, doc) ) else: self._comm_event(event) def _server_change(self, doc, ref, subpath, attr, old, new): if subpath: attr = f'{subpath}.{attr}' if attr in self._changing.get(ref, []): self._changing[ref].remove(attr) return processing = bool(self._events) self._events.update({attr: new}) if processing: return if doc.session_context: cb = partial(self._change_coroutine, doc) if attr in self._priority_changes: doc.add_next_tick_callback(cb) else: doc.add_timeout_callback(cb, self._debounce) else: self._change_event(doc) class Reactive(Syncable, Viewable): """ Reactive is a Viewable object that also supports syncing between the objects parameters and the underlying bokeh model either via the defined pyviz_comms.Comm type or using bokeh server. In addition it defines various methods which make it easy to link the parameters to other objects. """ #---------------------------------------------------------------- # Public API #---------------------------------------------------------------- def link(self, target, callbacks=None, bidirectional=False, **links): """ Links the parameters on this object to attributes on another object in Python. Supports two modes, either specify a mapping between the source and target object parameters as keywords or provide a dictionary of callbacks which maps from the source parameter to a callback which is triggered when the parameter changes. Arguments --------- target: object The target object of the link. callbacks: dict Maps from a parameter in the source object to a callback. bidirectional: boolean Whether to link source and target bi-directionally **links: dict Maps between parameters on this object to the parameters on the supplied object. """ if links and callbacks: raise ValueError('Either supply a set of parameters to ' 'link as keywords or a set of callbacks, ' 'not both.') elif not links and not callbacks: raise ValueError('Declare parameters to link or a set of ' 'callbacks, neither was defined.') elif callbacks and bidirectional: raise ValueError('Bidirectional linking not supported for ' 'explicit callbacks. You must define ' 'separate callbacks for each direction.') _updating = [] def link(*events): for event in events: if event.name in _updating: continue _updating.append(event.name) try: if callbacks: callbacks[event.name](target, event) else: setattr(target, links[event.name], event.new) finally: _updating.pop(_updating.index(event.name)) params = list(callbacks) if callbacks else list(links) cb = self.param.watch(link, params) bidirectional_watcher = None if bidirectional: _reverse_updating = [] reverse_links = {v: k for k, v in links.items()} def reverse_link(*events): for event in events: if event.name in _reverse_updating: continue _reverse_updating.append(event.name) try: setattr(self, reverse_links[event.name], event.new) finally: _reverse_updating.remove(event.name) bidirectional_watcher = target.param.watch(reverse_link, list(reverse_links)) link_args = tuple(cb) # Compatibility with Param versions where precedence is dropped # from iterator for backward compatibility with older Panel versions if 'precedence' in Watcher._fields and len(link_args) < len(Watcher._fields): link_args += (cb.precedence,) link = LinkWatcher(*(link_args+(target, links, callbacks is not None, bidirectional_watcher))) self._links.append(link) return cb def controls(self, parameters=[], jslink=True, **kwargs): """ Creates a set of widgets which allow manipulating the parameters on this instance. By default all parameters which support linking are exposed, but an explicit list of parameters can be provided. Arguments --------- parameters: list(str) An explicit list of parameters to return controls for. jslink: bool Whether to use jslinks instead of Python based links. This does not allow using all types of parameters. kwargs: dict Additional kwargs to pass to the Param pane(s) used to generate the controls widgets. Returns ------- A layout of the controls """ from .param import Param from .layout import Tabs, WidgetBox from .widgets import LiteralInput if parameters: linkable = parameters elif jslink: linkable = self._linkable_params else: linkable = list(self.param) params = [p for p in linkable if p not in Viewable.param] controls = Param(self.param, parameters=params, default_layout=WidgetBox, name='Controls', **kwargs) layout_params = [p for p in linkable if p in Viewable.param] if 'name' not in layout_params and self._rename.get('name', False) is not None and not parameters: layout_params.insert(0, 'name') style = Param(self.param, parameters=layout_params, default_layout=WidgetBox, name='Layout', **kwargs) if jslink: for p in params: widget = controls._widgets[p] widget.jslink(self, value=p, bidirectional=True) if isinstance(widget, LiteralInput): widget.serializer = 'json' for p in layout_params: widget = style._widgets[p] widget.jslink(self, value=p, bidirectional=p != 'loading') if isinstance(widget, LiteralInput): widget.serializer = 'json' if params and layout_params: return Tabs(controls.layout[0], style.layout[0]) elif params: return controls.layout[0] return style.layout[0] def jscallback(self, args={}, **callbacks): """ Allows defining a JS callback to be triggered when a property changes on the source object. The keyword arguments define the properties that trigger a callback and the JS code that gets executed. Arguments ---------- args: dict A mapping of objects to make available to the JS callback **callbacks: dict A mapping between properties on the source model and the code to execute when that property changes Returns ------- callback: Callback The Callback which can be used to disable the callback. """ from .links import Callback for k, v in list(callbacks.items()): callbacks[k] = self._rename.get(v, v) return Callback(self, code=callbacks, args=args) def jslink(self, target, code=None, args=None, bidirectional=False, **links): """ Links properties on the source object to those on the target object in JS code. Supports two modes, either specify a mapping between the source and target model properties as keywords or provide a dictionary of JS code snippets which maps from the source parameter to a JS code snippet which is executed when the property changes. Arguments ---------- target: HoloViews object or bokeh Model or panel Viewable The target to link the value to. code: dict Custom code which will be executed when the widget value changes. bidirectional: boolean Whether to link source and target bi-directionally **links: dict A mapping between properties on the source model and the target model property to link it to. Returns ------- link: GenericLink The GenericLink which can be used unlink the widget and the target model. """ if links and code: raise ValueError('Either supply a set of properties to ' 'link as keywords or a set of JS code ' 'callbacks, not both.') elif not links and not code: raise ValueError('Declare parameters to link or a set of ' 'callbacks, neither was defined.') if args is None: args = {} from .links import Link, assert_source_syncable, assert_target_syncable mapping = code or links assert_source_syncable(self, mapping) if isinstance(target, Syncable) and code is None: assert_target_syncable(self, target, mapping) return Link(self, target, properties=links, code=code, args=args, bidirectional=bidirectional) class SyncableData(Reactive): """ A baseclass for components which sync one or more data parameters with the frontend via a ColumnDataSource. """ selection = param.List(default=[], doc=""" The currently selected rows in the data.""") # Parameters which when changed require an update of the data _data_params = [] _rename = {'selection': None} __abstract = True def __init__(self, **params): super().__init__(**params) self._data = None self._processed = None self.param.watch(self._validate, self._data_params) if self._data_params: self.param.watch(self._update_cds, self._data_params) self.param.watch(self._update_selected, 'selection') self._validate() self._update_cds() def _validate(self, *events): """ Allows implementing validation for the data parameters. """ def _get_data(self): """ Implemented by subclasses converting data parameter(s) into a ColumnDataSource compatible data dictionary. Returns ------- processed: object Raw data after pre-processing (e.g. after filtering) data: dict Dictionary of columns used to instantiate and update the ColumnDataSource """ def _update_column(self, column, array): """ Implemented by subclasses converting changes in columns to changes in the data parameter. Parameters ---------- column: str The name of the column to update. array: numpy.ndarray The array data to update the column with. """ data = getattr(self, self._data_params[0]) data[column] = array if self._processed is not None: self._processed[column] = array def _update_data(self, data): self.param.update(**{self._data_params[0]: data}) def _manual_update(self, events, model, doc, root, parent, comm): for event in events: if event.type == 'triggered' and self._updating: continue elif hasattr(self, '_update_' + event.name): getattr(self, '_update_' + event.name)(model) @updating def _update_cds(self, *events): self._processed, self._data = self._get_data() msg = {'data': self._data} for ref, (m, _) in self._models.items(): self._apply_update(events, msg, m.source, ref) @updating def _update_selected(self, *events, indices=None): indices = self.selection if indices is None else indices msg = {'indices': indices} for ref, (m, _) in self._models.items(): self._apply_update(events, msg, m.source.selected, ref) def _apply_stream(self, ref, model, stream, rollover): self._changing[ref] = ['data'] try: model.source.stream(stream, rollover) finally: del self._changing[ref] @updating def _stream(self, stream, rollover=None): self._processed, _ = self._get_data() for ref, (m, _) in self._models.items(): if ref not in state._views or ref in state._fake_roots: continue viewable, root, doc, comm = state._views[ref] if comm or not doc.session_context or state._unblocked(doc): with unlocked(): m.source.stream(stream, rollover) if comm and 'embedded' not in root.tags: push(doc, comm) else: cb = partial(self._apply_stream, ref, m, stream, rollover) doc.add_next_tick_callback(cb) def _apply_patch(self, ref, model, patch): self._changing[ref] = ['data'] try: model.source.patch(patch) finally: del self._changing[ref] @updating def _patch(self, patch): for ref, (m, _) in self._models.items(): if ref not in state._views or ref in state._fake_roots: continue viewable, root, doc, comm = state._views[ref] if comm or not doc.session_context or state._unblocked(doc): with unlocked(): m.source.patch(patch) if comm and 'embedded' not in root.tags: push(doc, comm) else: cb = partial(self._apply_patch, ref, m, patch) doc.add_next_tick_callback(cb) def _update_manual(self, *events): """ Skip events triggered internally """ processed_events = [] for e in events: if e.name == self._data_params[0] and e.type == 'triggered' and self._updating: continue processed_events.append(e) super()._update_manual(*processed_events) def stream(self, stream_value, rollover=None, reset_index=True): """ Streams (appends) the `stream_value` provided to the existing value in an efficient manner. Arguments --------- stream_value: (Union[pd.DataFrame, pd.Series, Dict]) The new value(s) to append to the existing value. rollover: int A maximum column size, above which data from the start of the column begins to be discarded. If None, then columns will continue to grow unbounded. reset_index (bool, default=True): If True and the stream_value is a DataFrame, then its index is reset. Helps to keep the index unique and named `index`. Raises ------ ValueError: Raised if the stream_value is not a supported type. Examples -------- Stream a Series to a DataFrame >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> obj = DataComponent(value) >>> stream_value = pd.Series({"x": 4, "y": "d"}) >>> obj.stream(stream_value) >>> obj.value.to_dict("list") {'x': [1, 2, 4], 'y': ['a', 'b', 'd']} Stream a Dataframe to a Dataframe >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> obj = DataComponent(value) >>> stream_value = pd.DataFrame({"x": [3, 4], "y": ["c", "d"]}) >>> obj.stream(stream_value) >>> obj.value.to_dict("list") {'x': [1, 2, 3, 4], 'y': ['a', 'b', 'c', 'd']} Stream a Dictionary row to a DataFrame >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> tabulator = DataComponent(value) >>> stream_value = {"x": 4, "y": "d"} >>> obj.stream(stream_value) >>> obj.value.to_dict("list") {'x': [1, 2, 4], 'y': ['a', 'b', 'd']} Stream a Dictionary of Columns to a Dataframe >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> obj = DataComponent(value) >>> stream_value = {"x": [3, 4], "y": ["c", "d"]} >>> obj.stream(stream_value) >>> obj.value.to_dict("list") {'x': [1, 2, 3, 4], 'y': ['a', 'b', 'c', 'd']} """ if 'pandas' in sys.modules: import pandas as pd else: pd = None if pd and isinstance(stream_value, pd.DataFrame): if isinstance(self._processed, dict): self.stream(stream_value.to_dict(), rollover) return if reset_index: value_index_start = self._processed.index.max() + 1 stream_value = stream_value.reset_index(drop=True) stream_value.index += value_index_start combined = pd.concat([self._processed, stream_value]) if rollover is not None: combined = combined.iloc[-rollover:] with param.discard_events(self): self._update_data(combined) try: self._updating = True self.param.trigger(self._data_params[0]) finally: self._updating = False self._stream(stream_value, rollover) elif pd and isinstance(stream_value, pd.Series): if isinstance(self._processed, dict): self.stream({k: [v] for k, v in stream_value.to_dict().items()}, rollover) return value_index_start = self._processed.index.max() + 1 self._processed.loc[value_index_start] = stream_value with param.discard_events(self): self._update_data(self._processed) self._stream(self._processed.iloc[-1:], rollover) elif isinstance(stream_value, dict): if isinstance(self._processed, dict): if not all(col in stream_value for col in self._data): raise ValueError("Stream update must append to all columns.") for col, array in stream_value.items(): combined = np.concatenate([self._data[col], array]) if rollover is not None: combined = combined[-rollover:] self._update_column(col, combined) self._stream(stream_value, rollover) else: try: stream_value = pd.DataFrame(stream_value) except ValueError: stream_value = pd.Series(stream_value) self.stream(stream_value) else: raise ValueError("The stream value provided is not a DataFrame, Series or Dict!") def patch(self, patch_value): """ Efficiently patches (updates) the existing value with the `patch_value`. Arguments --------- patch_value: (Union[pd.DataFrame, pd.Series, Dict]) The value(s) to patch the existing value with. Raises ------ ValueError: Raised if the patch_value is not a supported type. Examples -------- Patch a DataFrame with a Dictionary row. >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> obj = DataComponent(value) >>> patch_value = {"x": [(0, 3)]} >>> obj.patch(patch_value) >>> obj.value.to_dict("list") {'x': [3, 2], 'y': ['a', 'b']} Patch a Dataframe with a Dictionary of Columns. >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> obj = DataComponent(value) >>> patch_value = {"x": [(slice(2), (3,4))], "y": [(1,'d')]} >>> obj.patch(patch_value) >>> obj.value.to_dict("list") {'x': [3, 4], 'y': ['a', 'd']} Patch a DataFrame with a Series. Please note the index is used in the update. >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> obj = DataComponent(value) >>> patch_value = pd.Series({"index": 1, "x": 4, "y": "d"}) >>> obj.patch(patch_value) >>> obj.value.to_dict("list") {'x': [1, 4], 'y': ['a', 'd']} Patch a Dataframe with a Dataframe. Please note the index is used in the update. >>> value = pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}) >>> obj = DataComponent(value) >>> patch_value = pd.DataFrame({"x": [3, 4], "y": ["c", "d"]}) >>> obj.patch(patch_value) >>> obj.value.to_dict("list") {'x': [3, 4], 'y': ['c', 'd']} """ if self._processed is None or isinstance(patch_value, dict): self._patch(patch_value) return if 'pandas' in sys.modules: import pandas as pd else: pd = None data = getattr(self, self._data_params[0]) if pd and isinstance(patch_value, pd.DataFrame): patch_value_dict = {} for column in patch_value.columns: patch_value_dict[column] = [] for index in patch_value.index: patch_value_dict[column].append((index, patch_value.loc[index, column])) self.patch(patch_value_dict) elif pd and isinstance(patch_value, pd.Series): if "index" in patch_value: # Series orient is row patch_value_dict = { k: [(patch_value["index"], v)] for k, v in patch_value.items() } patch_value_dict.pop("index") else: # Series orient is column patch_value_dict = { patch_value.name: [(index, value) for index, value in patch_value.items()] } self.patch(patch_value_dict) elif isinstance(patch_value, dict): for k, v in patch_value.items(): for index, patch in v: if pd and isinstance(self._processed, pd.DataFrame): data.loc[index, k] = patch else: data[k][index] = patch self._updating = True try: self._patch(patch_value) finally: self._updating = False else: raise ValueError( f"Patching with a patch_value of type {type(patch_value).__name__} " "is not supported. Please provide a DataFrame, Series or Dict." ) class ReactiveData(SyncableData): """ An extension of SyncableData which bi-directionally syncs a data parameter between frontend and backend using a ColumnDataSource. """ def __init__(self, **params): super().__init__(**params) self._old = None def _update_selection(self, indices): self.selection = indices def _convert_column(self, values, old_values): dtype = old_values.dtype if dtype.kind == 'M': if values.dtype.kind in 'if': values = (values * 10e5).astype(dtype) elif dtype.kind == 'O': if (all(isinstance(ov, dt.date) for ov in old_values) and not all(isinstance(iv, dt.date) for iv in values)): new_values = [] for iv in values: if isinstance(iv, dt.datetime): iv = iv.date() elif not isinstance(iv, dt.date): iv = dt.date.fromtimestamp(iv/1000) new_values.append(iv) values = new_values else: values = values.astype(dtype) return values def _process_data(self, data): if self._updating: return # Get old data to compare to old_raw, old_data = self._get_data() self._old = old_raw = old_raw.copy() if hasattr(old_raw, 'columns'): columns = list(old_raw.columns) else: columns = list(old_raw) updated = False for col, values in data.items(): col = self._renamed_cols.get(col, col) if col in self.indexes or col not in columns: continue if isinstance(values, dict): sorted_values = sorted(values.items(), key=lambda it: int(it[0])) values = [v for _, v in sorted_values] values = self._convert_column( np.asarray(values), old_raw[col] ) try: isequal = (old_raw[col] == values).all() except Exception: isequal = False if not isequal: self._update_column(col, values) updated = True # If no columns were updated we don't have to sync data if not updated: return # Ensure we trigger events self._updating = True old_data = getattr(self, self._data_params[0]) try: if old_data is self.value: with param.discard_events(self): self.value = old_raw self.value = old_data else: self.param.trigger('value') finally: self._updating = False # Ensure that if the data was changed in a user # callback, we still send the updated data if old_data is not self.value: self._update_cds() def _process_events(self, events): if 'data' in events: self._process_data(events.pop('data')) if 'indices' in events: self._updating = True try: self._update_selection(events.pop('indices')) finally: self._updating = False super(ReactiveData, self)._process_events(events) class ReactiveHTMLMetaclass(ParameterizedMetaclass): """ Parses the ReactiveHTML._template of the class and initializes variables, callbacks and the data model to sync the parameters and HTML attributes. """ _loaded_extensions = set() _name_counter = Counter() _script_regex = r"script\([\"|'](.*)[\"|']\)" def __init__(mcs, name, bases, dict_): from .io.datamodel import PARAM_MAPPING, construct_data_model mcs.__original_doc__ = mcs.__doc__ ParameterizedMetaclass.__init__(mcs, name, bases, dict_) cls_name = mcs.__name__ # Validate _child_config for name, child_type in mcs._child_config.items(): if name not in mcs.param: raise ValueError( f"{cls_name}._child_config for {name!r} does not " "match any parameters. Ensure the name of each " "child config matches one of the parameters." ) elif child_type not in ('model', 'template', 'literal'): raise ValueError( f"{cls_name}._child_config for {name!r} child " "parameter declares unknown type {child_type!r}. " f"The '_child_config' mode must be one of 'model', " "'template' or 'literal'." ) mcs._parser = ReactiveHTMLParser(mcs) mcs._parser.feed(mcs._template) # Ensure syntactically valid jinja2 for loops if mcs._parser._open_for: raise ValueError( f"{cls_name}._template contains for loop without closing {{% endfor %}} statement." ) # Ensure there are no open tags if mcs._parser._node_stack: raise ValueError( f"{cls_name}._template contains tags which were never " "closed. Ensure all tags in your template have a " "matching closing tag, e.g. if there is a tag