""" Collection of either extremely generic or simple Operation examples. """ from __future__ import division import warnings import numpy as np import param from param import _is_number from ..core import (Operation, NdOverlay, Overlay, GridMatrix, HoloMap, Dataset, Element, Collator, Dimension) from ..core.data import ArrayInterface, DictInterface, default_datatype from ..core.data.util import dask_array_module from ..core.util import ( LooseVersion, group_sanitizer, label_sanitizer, pd, basestring, datetime_types, isfinite, dt_to_int, isdatetime, is_dask_array, is_cupy_array, is_ibis_expr ) from ..element.chart import Histogram, Scatter from ..element.raster import Image, RGB from ..element.path import Contours, Polygons from ..element.util import categorical_aggregate2d # noqa (API import) from ..streams import RangeXY column_interfaces = [ArrayInterface, DictInterface] if pd: from ..core.data import PandasInterface column_interfaces.append(PandasInterface) def identity(x,k): return x class operation(Operation): """ The most generic operation that wraps any callable into an Operation. The callable needs to accept an HoloViews component and a key (that may be ignored) and must return a new HoloViews component. This class may be useful for turning a HoloViews method into an operation to define as compositor operation. For instance, the following definition: operation.instance(op=lambda x, k: x.collapse(np.subtract)) Could be used to implement a collapse operation to subtracts the data between Rasters in an Overlay. """ output_type = param.Parameter(None, doc=""" The output element type which may be None to disable type checking. May be used to declare useful information to other code in HoloViews, e.g. required for tab-completion support of operations registered with compositors.""") group = param.String(default='Operation', doc=""" The group assigned to the result after having applied the operator.""") op = param.Callable(default=identity, doc=""" The operation used to generate a new HoloViews object returned by the operation. By default, the identity operation is applied.""") def _process(self, view, key=None): retval = self.p.op(view, key) if (self.p.output_type is not None): assert isinstance(retval, self.p.output_type), \ "Return value does not match the declared output type." return retval.relabel(group=self.p.group) class factory(Operation): """ Simple operation that constructs any element that accepts some other element as input. For instance, RGB and HSV elements can be created from overlays of Image elements. """ output_type = param.Parameter(RGB, doc=""" The output type of the factor operation. By default, if three overlaid Images elements are supplied, the corresponding RGB element will be returned. """) args = param.List(default=[], doc=""" The list of positional argument to pass to the factory""") kwargs = param.Dict(default={}, doc=""" The dict of keyword arguments to pass to the factory""") def _process(self, view, key=None): return self.p.output_type(view, *self.p.args, **self.p.kwargs) class function(Operation): output_type = param.ClassSelector(class_=type, doc=""" The output type of the method operation""") input_type = param.ClassSelector(class_=type, doc=""" The object type the method is defined on""") fn = param.Callable(default=lambda el, *args, **kwargs: el, doc=""" The function to apply.""") args = param.List(default=[], doc=""" The list of positional argument to pass to the method""") kwargs = param.Dict(default={}, doc=""" The dict of keyword arguments to pass to the method""") def _process(self, element, key=None): return self.p.fn(element, *self.p.args, **self.p.kwargs) class method(Operation): """ Operation that wraps a method call """ output_type = param.ClassSelector(class_=type, doc=""" The output type of the method operation""") input_type = param.ClassSelector(class_=type, doc=""" The object type the method is defined on""") method_name = param.String(default='__call__', doc=""" The method name""") args = param.List(default=[], doc=""" The list of positional argument to pass to the method""") kwargs = param.Dict(default={}, doc=""" The dict of keyword arguments to pass to the method""") def _process(self, element, key=None): fn = getattr(self.p.input_type, self.p.method_name) return fn(element, *self.p.args, **self.p.kwargs) class apply_when(param.ParameterizedFunction): """ Applies a selection depending on the current zoom range. If the supplied predicate function returns a True it will apply the operation otherwise it will return the raw element after the selection. For example the following will apply datashading if the number of points in the current viewport exceed 1000 otherwise just returning the selected points element: apply_when(points, operation=datashade, predicate=lambda x: x > 1000) """ operation = param.Callable(default=lambda x: x) predicate = param.Callable(default=None) def _apply(self, element, x_range, y_range, invert=False): selected = element if x_range is not None and y_range is not None: selected = element[x_range, y_range] condition = self.predicate(selected) if (not invert and condition) or (invert and not condition): return selected elif selected.interface.gridded: return selected.clone([]) else: return selected.iloc[:0] def __call__(self, obj, **params): if 'streams' in params: streams = params.pop('streams') else: streams = [RangeXY()] self.param.set_param(**params) if not self.predicate: raise ValueError( 'Must provide a predicate function to determine when ' 'to apply the operation and when to return the selected ' 'data.' ) applied = self.operation(obj.apply(self._apply, streams=streams)) raw = obj.apply(self._apply, streams=streams, invert=True) return applied * raw class chain(Operation): """ Defining an Operation chain is an easy way to define a new Operation from a series of existing ones. The argument is a list of Operation (or Operation instances) that are called in sequence to generate the returned element. chain(operations=[gradient, threshold.instance(level=2)]) This operation can accept an Image instance and would first compute the gradient before thresholding the result at a level of 2.0. Instances are only required when arguments need to be passed to individual operations so the resulting object is a function over a single argument. """ output_type = param.Parameter(Image, doc=""" The output type of the chain operation. Must be supplied if the chain is to be used as a channel operation.""") group = param.String(default='', doc=""" The group assigned to the result after having applied the chain. Defaults to the group produced by the last operation in the chain""") operations = param.List(default=[], class_=Operation, doc=""" A list of Operations (or Operation instances) that are applied on the input from left to right.""") def _process(self, view, key=None): processed = view for i, operation in enumerate(self.p.operations): processed = operation.process_element( processed, key, input_ranges=self.p.input_ranges ) if not self.p.group: return processed else: return processed.clone(group=self.p.group) class transform(Operation): """ Generic Operation to transform an input Image or RGBA element into an output Image. The transformation is defined by the supplied callable that accepts the data of the input Image (typically a numpy array) and returns the transformed data of the output Image. This operator is extremely versatile; for instance, you could implement an alternative to the explicit threshold operator with: operator=lambda x: np.clip(x, 0, 0.5) Alternatively, you can implement a transform computing the 2D autocorrelation using the scipy library with: operator=lambda x: scipy.signal.correlate2d(x, x) """ output_type = Image group = param.String(default='Transform', doc=""" The group assigned to the result after applying the transform.""") operator = param.Callable(doc=""" Function of one argument that transforms the data in the input Image to the data in the output Image. By default, acts as the identity function such that the output matches the input.""") def _process(self, img, key=None): processed = (img.data if not self.p.operator else self.p.operator(img.data)) return img.clone(processed, group=self.p.group) class image_overlay(Operation): """ Operation to build a overlay of images to a specification from a subset of the required elements. This is useful for reordering the elements of an overlay, duplicating layers of an overlay or creating blank image elements in the appropriate positions. For instance, image_overlay may build a three layered input suitable for the RGB factory operation even if supplied with one or two of the required channels (creating blank channels for the missing elements). Note that if there is any ambiguity regarding the match, the strongest match will be used. In the case of a tie in match strength, the first layer in the input is used. One successful match is always required. """ output_type = Overlay spec = param.String(doc=""" Specification of the output Overlay structure. For instance: Image.R * Image.G * Image.B Will ensure an overlay of this structure is created even if (for instance) only (Image.R * Image.B) is supplied. Elements in the input overlay that match are placed in the appropriate positions and unavailable specification elements are created with the specified fill group.""") fill = param.Number(default=0) default_range = param.Tuple(default=(0,1), doc=""" The default range that will be set on the value_dimension of any automatically created blank image elements.""") group = param.String(default='Transform', doc=""" The group assigned to the resulting overlay.""") @classmethod def _match(cls, el, spec): "Return the strength of the match (None if no match)" spec_dict = dict(zip(['type', 'group', 'label'], spec.split('.'))) if not isinstance(el, Image) or spec_dict['type'] != 'Image': raise NotImplementedError("Only Image currently supported") sanitizers = {'group':group_sanitizer, 'label':label_sanitizer} strength = 1 for key in ['group', 'label']: attr_value = sanitizers[key](getattr(el, key)) if key in spec_dict: if spec_dict[key] != attr_value: return None strength += 1 return strength def _match_overlay(self, raster, overlay_spec): """ Given a raster or input overlay, generate a list of matched elements (None if no match) and corresponding tuple of match strength values. """ ordering = [None]*len(overlay_spec) # Elements to overlay strengths = [0]*len(overlay_spec) # Match strengths elements = raster.values() if isinstance(raster, Overlay) else [raster] for el in elements: for pos in range(len(overlay_spec)): strength = self._match(el, overlay_spec[pos]) if strength is None: continue # No match elif (strength <= strengths[pos]): continue # Weaker match else: # Stronger match ordering[pos] = el strengths[pos] = strength return ordering, strengths def _process(self, raster, key=None): specs = tuple(el.strip() for el in self.p.spec.split('*')) ordering, strengths = self._match_overlay(raster, specs) if all(el is None for el in ordering): raise Exception("The image_overlay operation requires at least one match") completed = [] strongest = ordering[np.argmax(strengths)] for el, spec in zip(ordering, specs): if el is None: spec_dict = dict(zip(['type', 'group', 'label'], spec.split('.'))) el = Image(np.ones(strongest.data.shape) * self.p.fill, group=spec_dict.get('group','Image'), label=spec_dict.get('label','')) el.vdims[0].range = self.p.default_range completed.append(el) return np.prod(completed) class threshold(Operation): """ Threshold a given Image whereby all values higher than a given level map to the specified high value and all values lower than that level map to the specified low value. """ output_type = Image level = param.Number(default=0.5, doc=""" The value at which the threshold is applied. Values lower than the threshold map to the 'low' value and values above map to the 'high' value.""") high = param.Number(default=1.0, doc=""" The value given to elements greater than (or equal to) the threshold.""") low = param.Number(default=0.0, doc=""" The value given to elements below the threshold.""") group = param.String(default='Threshold', doc=""" The group assigned to the thresholded output.""") _per_element = True def _process(self, matrix, key=None): if not isinstance(matrix, Image): raise TypeError("The threshold operation requires a Image as input.") arr = matrix.data high = np.ones(arr.shape) * self.p.high low = np.ones(arr.shape) * self.p.low thresholded = np.where(arr > self.p.level, high, low) return matrix.clone(thresholded, group=self.p.group) class gradient(Operation): """ Compute the gradient plot of the supplied Image. If the Image value dimension is cyclic, the smallest step is taken considered the cyclic range """ output_type = Image group = param.String(default='Gradient', doc=""" The group assigned to the output gradient matrix.""") _per_element = True def _process(self, matrix, key=None): if len(matrix.vdims) != 1: raise ValueError("Input matrix to gradient operation must " "have single value dimension.") matrix_dim = matrix.vdims[0] data = np.flipud(matrix.dimension_values(matrix_dim, flat=False)) r, c = data.shape if matrix_dim.cyclic and (None in matrix_dim.range): raise Exception("Cyclic range must be specified to compute " "the gradient of cyclic quantities") cyclic_range = None if not matrix_dim.cyclic else np.diff(matrix_dim.range) if cyclic_range is not None: # shift values such that wrapping works ok data = data - matrix_dim.range[0] dx = np.diff(data, 1, axis=1)[0:r-1, 0:c-1] dy = np.diff(data, 1, axis=0)[0:r-1, 0:c-1] if cyclic_range is not None: # Wrap into the specified range # Convert negative differences to an equivalent positive value dx = dx % cyclic_range dy = dy % cyclic_range # # Prefer small jumps dx_negatives = dx - cyclic_range dy_negatives = dy - cyclic_range dx = np.where(np.abs(dx_negatives) LooseVersion('8.0') if not full_cupy_support and (normed or self.p.weight_dimension): data = cupy.asnumpy(data) is_cupy = False else: is_finite = cupy.isfinite # Mask data if is_ibis_expr(data): mask = data.notnull() if self.p.nonzero: mask = mask & (data != 0) data = data.to_projection() data = data[mask] no_data = not len(data.head(1).execute()) data = data[dim.name] else: mask = is_finite(data) if self.p.nonzero: mask = mask & (data != 0) data = data[mask] da = dask_array_module() no_data = False if da and isinstance(data, da.Array) else not len(data) # Compute weights if self.p.weight_dimension: if hasattr(element, 'interface'): weights = element.interface.values(element, self.p.weight_dimension, compute=False) else: weights = element.dimension_values(self.p.weight_dimension) weights = weights[mask] else: weights = None # Compute bins if isinstance(self.p.bins, str): bin_data = cupy.asnumpy(data) if is_cupy else data edges = np.histogram_bin_edges(bin_data, bins=self.p.bins) elif isinstance(self.p.bins, (list, np.ndarray)): edges = self.p.bins if isdatetime(edges): edges = edges.astype('datetime64[ns]').astype('int64') else: hist_range = self.p.bin_range or element.range(selected_dim) # Suppress a warning emitted by Numpy when datetime or timedelta scalars # are compared. See https://github.com/numpy/numpy/issues/10095 and # https://github.com/numpy/numpy/issues/9210. with warnings.catch_warnings(): warnings.filterwarnings( action='ignore', message='elementwise comparison failed', category=DeprecationWarning ) null_hist_range = hist_range == (0, 0) # Avoids range issues including zero bin range and empty bins if null_hist_range or any(not isfinite(r) for r in hist_range): hist_range = (0, 1) steps = self.p.num_bins + 1 start, end = hist_range if is_datetime: start, end = dt_to_int(start, 'ns'), dt_to_int(end, 'ns') if self.p.log: bin_min = max([abs(start), data[data>0].min()]) edges = np.logspace(np.log10(bin_min), np.log10(end), steps) else: edges = np.linspace(start, end, steps) if is_cupy: edges = cupy.asarray(edges) if not is_dask_array(data) and no_data: nbins = self.p.num_bins if self.p.bins is None else len(self.p.bins)-1 hist = np.zeros(nbins) elif hasattr(element, 'interface'): density = True if normed else False hist, edges = element.interface.histogram( data, edges, density=density, weights=weights ) if normed == 'height': hist /= hist.max() if self.p.weight_dimension and self.p.mean_weighted: hist_mean, _ = element.interface.histogram( data, density=False, bins=edges ) hist /= hist_mean elif normed: # This covers True, 'height', 'integral' hist, edges = np.histogram(data, density=True, weights=weights, bins=edges) if normed == 'height': hist /= hist.max() else: hist, edges = np.histogram(data, normed=normed, weights=weights, bins=edges) if self.p.weight_dimension and self.p.mean_weighted: hist_mean, _ = np.histogram(data, density=False, bins=self.p.num_bins) hist /= hist_mean hist[np.isnan(hist)] = 0 if is_datetime: edges = (edges/1e3).astype('datetime64[us]') params = {} if self.p.weight_dimension: params['vdims'] = [element.get_dimension(self.p.weight_dimension)] elif self.p.frequency_label: label = self.p.frequency_label.format(dim=dim.pprint_label) params['vdims'] = [Dimension('Frequency', label=label)] else: label = 'Frequency' if normed else 'Count' params['vdims'] = [Dimension('{0}_{1}'.format(dim.name, label.lower()), label=label)] if element.group != element.__class__.__name__: params['group'] = element.group if self.p.cumulative: hist = np.cumsum(hist) if self.p.normed in (True, 'integral'): hist *= edges[1]-edges[0] # Save off the computed bin edges so that if this operation instance # is used to compute another histogram, it will default to the same # bin edges. self.bins = list(edges) return Histogram((edges, hist), kdims=[element.get_dimension(selected_dim)], label=element.label, **params) class decimate(Operation): """ Decimates any column based Element to a specified number of random rows if the current element defined by the x_range and y_range contains more than max_samples. By default the operation returns a DynamicMap with a RangeXY stream allowing dynamic downsampling. """ dynamic = param.Boolean(default=True, doc=""" Enables dynamic processing by default.""") link_inputs = param.Boolean(default=True, doc=""" By default, the link_inputs parameter is set to True so that when applying shade, backends that support linked streams update RangeXY streams on the inputs of the shade operation.""") max_samples = param.Integer(default=5000, doc=""" Maximum number of samples to display at the same time.""") random_seed = param.Integer(default=42, doc=""" Seed used to initialize randomization.""") streams = param.ClassSelector(default=[RangeXY], class_=(dict, list), doc=""" List of streams that are applied if dynamic=True, allowing for dynamic interaction with the plot.""") x_range = param.NumericTuple(default=None, length=2, doc=""" The x_range as a tuple of min and max x-value. Auto-ranges if set to None.""") y_range = param.NumericTuple(default=None, length=2, doc=""" The x_range as a tuple of min and max y-value. Auto-ranges if set to None.""") _per_element = True def _process_layer(self, element, key=None): if not isinstance(element, Dataset): raise ValueError("Cannot downsample non-Dataset types.") if element.interface not in column_interfaces: element = element.clone(tuple(element.columns().values())) xstart, xend = self.p.x_range if self.p.x_range else element.range(0) ystart, yend = self.p.y_range if self.p.y_range else element.range(1) # Slice element to current ranges xdim, ydim = element.dimensions(label=True)[0:2] sliced = element.select(**{xdim: (xstart, xend), ydim: (ystart, yend)}) if len(sliced) > self.p.max_samples: prng = np.random.RandomState(self.p.random_seed) return sliced.iloc[prng.choice(len(sliced), self.p.max_samples, False)] return sliced def _process(self, element, key=None): return element.map(self._process_layer, Element) class interpolate_curve(Operation): """ Resamples a Curve using the defined interpolation method, e.g. to represent changes in y-values as steps. """ interpolation = param.ObjectSelector(objects=['steps-pre', 'steps-mid', 'steps-post', 'linear'], default='steps-mid', doc=""" Controls the transition point of the step along the x-axis.""") _per_element = True @classmethod def pts_to_prestep(cls, x, values): steps = np.zeros(2 * len(x) - 1) value_steps = tuple(np.empty(2 * len(x) - 1, dtype=v.dtype) for v in values) steps[0::2] = x steps[1::2] = steps[0:-2:2] val_arrays = [] for v, s in zip(values, value_steps): s[0::2] = v s[1::2] = s[2::2] val_arrays.append(s) return steps, tuple(val_arrays) @classmethod def pts_to_midstep(cls, x, values): steps = np.zeros(2 * len(x)) value_steps = tuple(np.empty(2 * len(x), dtype=v.dtype) for v in values) steps[1:-1:2] = steps[2::2] = x[:-1] + (x[1:] - x[:-1])/2 steps[0], steps[-1] = x[0], x[-1] val_arrays = [] for v, s in zip(values, value_steps): s[0::2] = v s[1::2] = s[0::2] val_arrays.append(s) return steps, tuple(val_arrays) @classmethod def pts_to_poststep(cls, x, values): steps = np.zeros(2 * len(x) - 1) value_steps = tuple(np.empty(2 * len(x) - 1, dtype=v.dtype) for v in values) steps[0::2] = x steps[1::2] = steps[2::2] val_arrays = [] for v, s in zip(values, value_steps): s[0::2] = v s[1::2] = s[0:-2:2] val_arrays.append(s) return steps, tuple(val_arrays) def _process_layer(self, element, key=None): INTERPOLATE_FUNCS = {'steps-pre': self.pts_to_prestep, 'steps-mid': self.pts_to_midstep, 'steps-post': self.pts_to_poststep} if self.p.interpolation not in INTERPOLATE_FUNCS: return element x = element.dimension_values(0) is_datetime = isdatetime(x) if is_datetime: dt_type = 'datetime64[ns]' x = x.astype(dt_type) dvals = tuple(element.dimension_values(d) for d in element.dimensions()[1:]) xs, dvals = INTERPOLATE_FUNCS[self.p.interpolation](x, dvals) if is_datetime: xs = xs.astype(dt_type) return element.clone((xs,)+dvals) def _process(self, element, key=None): return element.map(self._process_layer, Element) #==================# # Other operations # #==================# class collapse(Operation): """ Given an overlay of Element types, collapse into single Element object using supplied function. Collapsing aggregates over the key dimensions of each object applying the supplied fn to each group. This is an example of an Operation that does not involve any Raster types. """ fn = param.Callable(default=np.mean, doc=""" The function that is used to collapse the curve y-values for each x-value.""") def _process(self, overlay, key=None): if isinstance(overlay, NdOverlay): collapse_map = HoloMap(overlay) else: collapse_map = HoloMap({i: el for i, el in enumerate(overlay)}) return collapse_map.collapse(function=self.p.fn) class gridmatrix(param.ParameterizedFunction): """ The gridmatrix operation takes an Element or HoloMap of Elements as input and creates a GridMatrix object, which plots each dimension in the Element against each other dimension. This provides a very useful overview of high-dimensional data and is inspired by pandas and seaborn scatter_matrix implementations. """ chart_type = param.Parameter(default=Scatter, doc=""" The Element type used to display bivariate distributions of the data.""") diagonal_type = param.Parameter(default=None, doc=""" The Element type along the diagonal, may be a Histogram or any other plot type which can visualize a univariate distribution. This parameter overrides diagonal_operation.""") diagonal_operation = param.Parameter(default=histogram, doc=""" The operation applied along the diagonal, may be a histogram-operation or any other function which returns a viewable element.""") overlay_dims = param.List(default=[], doc=""" If a HoloMap is supplied, this will allow overlaying one or more of its key dimensions.""") def __call__(self, data, **params): p = param.ParamOverrides(self, params) if isinstance(data, (HoloMap, NdOverlay)): ranges = {d.name: data.range(d) for d in data.dimensions()} data = data.clone({k: GridMatrix(self._process(p, v, ranges)) for k, v in data.items()}) data = Collator(data, merge_type=type(data))() if p.overlay_dims: data = data.map(lambda x: x.overlay(p.overlay_dims), (HoloMap,)) return data elif isinstance(data, Element): data = self._process(p, data) return GridMatrix(data) def _process(self, p, element, ranges={}): # Creates a unified Dataset.data attribute # to draw the data from if isinstance(element.data, np.ndarray): el_data = element.table(default_datatype) else: el_data = element.data # Get dimensions to plot against each other types = (str, basestring, np.str_, np.object_)+datetime_types dims = [d for d in element.dimensions() if _is_number(element.range(d)[0]) and not issubclass(element.get_dimension_type(d), types)] permuted_dims = [(d1, d2) for d1 in dims for d2 in dims[::-1]] # Convert Histogram type to operation to avoid one case in the if below. if p.diagonal_type is Histogram: p.diagonal_type = None p.diagonal_operation = histogram data = {} for d1, d2 in permuted_dims: if d1 == d2: if p.diagonal_type is not None: if p.diagonal_type._auto_indexable_1d: el = p.diagonal_type(el_data, kdims=[d1], vdims=[d2], datatype=[default_datatype]) else: values = element.dimension_values(d1) el = p.diagonal_type(values, kdims=[d1]) elif p.diagonal_operation is None: continue elif p.diagonal_operation is histogram or isinstance(p.diagonal_operation, histogram): bin_range = ranges.get(d1.name, element.range(d1)) el = p.diagonal_operation(element, dimension=d1.name, bin_range=bin_range) else: el = p.diagonal_operation(element, dimension=d1.name) else: kdims, vdims = ([d1, d2], []) if len(p.chart_type.kdims) == 2 else (d1, d2) el = p.chart_type(el_data, kdims=kdims, vdims=vdims, datatype=[default_datatype]) data[(d1.name, d2.name)] = el return data