from __future__ import annotations from functools import partial from typing import ClassVar import numpy as np import pandas as pd from ..utils import derived_from def maybe_wrap_pandas(obj, x): if isinstance(x, np.ndarray): if isinstance(obj, pd.Series): return pd.Series(x, index=obj.index, dtype=x.dtype) return pd.Index(x) return x class Accessor: """ Base class for pandas Accessor objects cat, dt, and str. Notes ----- Subclasses should define ``_accessor_name`` """ _not_implemented: ClassVar[set[str]] = set() def __init__(self, series): from .core import Series if not isinstance(series, Series): raise ValueError("Accessor cannot be initialized") series_meta = series._meta if hasattr(series_meta, "to_series"): # is index-like series_meta = series_meta.to_series() meta = getattr(series_meta, self._accessor_name) self._meta = meta self._series = series @staticmethod def _delegate_property(obj, accessor, attr): out = getattr(getattr(obj, accessor, obj), attr) return maybe_wrap_pandas(obj, out) @staticmethod def _delegate_method(obj, accessor, attr, args, kwargs): out = getattr(getattr(obj, accessor, obj), attr)(*args, **kwargs) return maybe_wrap_pandas(obj, out) def _property_map(self, attr): meta = self._delegate_property(self._series._meta, self._accessor_name, attr) token = f"{self._accessor_name}-{attr}" return self._series.map_partitions( self._delegate_property, self._accessor_name, attr, token=token, meta=meta ) def _function_map(self, attr, *args, **kwargs): if "meta" in kwargs: meta = kwargs.pop("meta") else: meta = self._delegate_method( self._series._meta_nonempty, self._accessor_name, attr, args, kwargs ) token = f"{self._accessor_name}-{attr}" return self._series.map_partitions( self._delegate_method, self._accessor_name, attr, args, kwargs, meta=meta, token=token, ) @property def _delegates(self): return set(dir(self._meta)) - self._not_implemented def __dir__(self): o = self._delegates o.update(self.__dict__) o.update(dir(type(self))) return list(o) def __getattr__(self, key): if key in self._delegates: if callable(getattr(self._meta, key)): return partial(self._function_map, key) else: return self._property_map(key) else: raise AttributeError(key) class DatetimeAccessor(Accessor): """Accessor object for datetimelike properties of the Series values. Examples -------- >>> s.dt.microsecond # doctest: +SKIP """ _accessor_name = "dt" class StringAccessor(Accessor): """Accessor object for string properties of the Series values. Examples -------- >>> s.str.lower() # doctest: +SKIP """ _accessor_name = "str" _not_implemented = {"get_dummies"} @derived_from(pd.core.strings.StringMethods) def split(self, pat=None, n=-1, expand=False): if expand: if n == -1: raise NotImplementedError( "To use the expand parameter you must specify the number of " "expected splits with the n= parameter. Usually n splits result in n+1 output columns." ) else: delimiter = " " if pat is None else pat meta = self._series._meta._constructor( [delimiter.join(["a"] * (n + 1))], index=self._series._meta_nonempty[:1].index, ) meta = meta.str.split(n=n, expand=expand, pat=pat) else: meta = (self._series.name, object) return self._function_map("split", pat=pat, n=n, expand=expand, meta=meta) @derived_from(pd.core.strings.StringMethods) def cat(self, others=None, sep=None, na_rep=None): from .core import Index, Series if others is None: def str_cat_none(x): if isinstance(x, (Series, Index)): x = x.compute() return x.str.cat(sep=sep, na_rep=na_rep) return self._series.reduction(chunk=str_cat_none, aggregate=str_cat_none) valid_types = (Series, Index, pd.Series, pd.Index) if isinstance(others, valid_types): others = [others] elif not all(isinstance(a, valid_types) for a in others): raise TypeError("others must be Series/Index") return self._series.map_partitions( str_cat, *others, sep=sep, na_rep=na_rep, meta=self._series._meta ) @derived_from(pd.core.strings.StringMethods) def extractall(self, pat, flags=0): return self._series.map_partitions( str_extractall, pat, flags, token="str-extractall" ) def __getitem__(self, index): return self._series.map_partitions(str_get, index, meta=self._series._meta) def str_extractall(series, pat, flags): return series.str.extractall(pat, flags=flags) def str_get(series, index): """Implements series.str[index]""" return series.str[index] def str_cat(self, *others, **kwargs): return self.str.cat(others=others, **kwargs)