# Code adapted from "upfirdn" python library with permission: # # Copyright (c) 2009, Motorola, Inc # # All Rights Reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # * Neither the name of Motorola nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, # THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import numpy as np from ._upfirdn_apply import _output_len, _apply, mode_enum __all__ = ['upfirdn', '_output_len'] _upfirdn_modes = [ 'constant', 'wrap', 'edge', 'smooth', 'symmetric', 'reflect', 'antisymmetric', 'antireflect', 'line', ] def _pad_h(h, up): """Store coefficients in a transposed, flipped arrangement. For example, suppose upRate is 3, and the input number of coefficients is 10, represented as h[0], ..., h[9]. Then the internal buffer will look like this:: h[9], h[6], h[3], h[0], // flipped phase 0 coefs 0, h[7], h[4], h[1], // flipped phase 1 coefs (zero-padded) 0, h[8], h[5], h[2], // flipped phase 2 coefs (zero-padded) """ h_padlen = len(h) + (-len(h) % up) h_full = np.zeros(h_padlen, h.dtype) h_full[:len(h)] = h h_full = h_full.reshape(-1, up).T[:, ::-1].ravel() return h_full def _check_mode(mode): mode = mode.lower() enum = mode_enum(mode) return enum class _UpFIRDn: """Helper for resampling.""" def __init__(self, h, x_dtype, up, down): h = np.asarray(h) if h.ndim != 1 or h.size == 0: raise ValueError('h must be 1-D with non-zero length') self._output_type = np.result_type(h.dtype, x_dtype, np.float32) h = np.asarray(h, self._output_type) self._up = int(up) self._down = int(down) if self._up < 1 or self._down < 1: raise ValueError('Both up and down must be >= 1') # This both transposes, and "flips" each phase for filtering self._h_trans_flip = _pad_h(h, self._up) self._h_trans_flip = np.ascontiguousarray(self._h_trans_flip) self._h_len_orig = len(h) def apply_filter(self, x, axis=-1, mode='constant', cval=0): """Apply the prepared filter to the specified axis of N-D signal x.""" output_len = _output_len(self._h_len_orig, x.shape[axis], self._up, self._down) # Explicit use of np.int64 for output_shape dtype avoids OverflowError # when allocating large array on platforms where np.int_ is 32 bits output_shape = np.asarray(x.shape, dtype=np.int64) output_shape[axis] = output_len out = np.zeros(output_shape, dtype=self._output_type, order='C') axis = axis % x.ndim mode = _check_mode(mode) _apply(np.asarray(x, self._output_type), self._h_trans_flip, out, self._up, self._down, axis, mode, cval) return out def upfirdn(h, x, up=1, down=1, axis=-1, mode='constant', cval=0): """Upsample, FIR filter, and downsample. Parameters ---------- h : array_like 1-D FIR (finite-impulse response) filter coefficients. x : array_like Input signal array. up : int, optional Upsampling rate. Default is 1. down : int, optional Downsampling rate. Default is 1. axis : int, optional The axis of the input data array along which to apply the linear filter. The filter is applied to each subarray along this axis. Default is -1. mode : str, optional The signal extension mode to use. The set ``{"constant", "symmetric", "reflect", "edge", "wrap"}`` correspond to modes provided by `numpy.pad`. ``"smooth"`` implements a smooth extension by extending based on the slope of the last 2 points at each end of the array. ``"antireflect"`` and ``"antisymmetric"`` are anti-symmetric versions of ``"reflect"`` and ``"symmetric"``. The mode `"line"` extends the signal based on a linear trend defined by the first and last points along the ``axis``. .. versionadded:: 1.4.0 cval : float, optional The constant value to use when ``mode == "constant"``. .. versionadded:: 1.4.0 Returns ------- y : ndarray The output signal array. Dimensions will be the same as `x` except for along `axis`, which will change size according to the `h`, `up`, and `down` parameters. Notes ----- The algorithm is an implementation of the block diagram shown on page 129 of the Vaidyanathan text [1]_ (Figure 4.3-8d). The direct approach of upsampling by factor of P with zero insertion, FIR filtering of length ``N``, and downsampling by factor of Q is O(N*Q) per output sample. The polyphase implementation used here is O(N/P). .. versionadded:: 0.18 References ---------- .. [1] P. P. Vaidyanathan, Multirate Systems and Filter Banks, Prentice Hall, 1993. Examples -------- Simple operations: >>> from scipy.signal import upfirdn >>> upfirdn([1, 1, 1], [1, 1, 1]) # FIR filter array([ 1., 2., 3., 2., 1.]) >>> upfirdn([1], [1, 2, 3], 3) # upsampling with zeros insertion array([ 1., 0., 0., 2., 0., 0., 3., 0., 0.]) >>> upfirdn([1, 1, 1], [1, 2, 3], 3) # upsampling with sample-and-hold array([ 1., 1., 1., 2., 2., 2., 3., 3., 3.]) >>> upfirdn([.5, 1, .5], [1, 1, 1], 2) # linear interpolation array([ 0.5, 1. , 1. , 1. , 1. , 1. , 0.5, 0. ]) >>> upfirdn([1], np.arange(10), 1, 3) # decimation by 3 array([ 0., 3., 6., 9.]) >>> upfirdn([.5, 1, .5], np.arange(10), 2, 3) # linear interp, rate 2/3 array([ 0. , 1. , 2.5, 4. , 5.5, 7. , 8.5, 0. ]) Apply a single filter to multiple signals: >>> x = np.reshape(np.arange(8), (4, 2)) >>> x array([[0, 1], [2, 3], [4, 5], [6, 7]]) Apply along the last dimension of ``x``: >>> h = [1, 1] >>> upfirdn(h, x, 2) array([[ 0., 0., 1., 1.], [ 2., 2., 3., 3.], [ 4., 4., 5., 5.], [ 6., 6., 7., 7.]]) Apply along the 0th dimension of ``x``: >>> upfirdn(h, x, 2, axis=0) array([[ 0., 1.], [ 0., 1.], [ 2., 3.], [ 2., 3.], [ 4., 5.], [ 4., 5.], [ 6., 7.], [ 6., 7.]]) """ x = np.asarray(x) ufd = _UpFIRDn(h, x.dtype, up, down) # This is equivalent to (but faster than) using np.apply_along_axis return ufd.apply_filter(x, axis, mode, cval)