import contextlib import gzip import itertools import math import os.path import pickle import re import shutil import sys import tempfile import warnings from contextlib import ExitStack from io import BytesIO from pathlib import Path from typing import Optional import numpy as np import pandas as pd import pytest from pandas.errors import OutOfBoundsDatetime import xarray as xr from xarray import ( DataArray, Dataset, backends, load_dataarray, load_dataset, open_dataarray, open_dataset, open_mfdataset, save_mfdataset, ) from xarray.backends.common import robust_getitem from xarray.backends.h5netcdf_ import H5netcdfBackendEntrypoint from xarray.backends.netcdf3 import _nc3_dtype_coercions from xarray.backends.netCDF4_ import ( NetCDF4BackendEntrypoint, _extract_nc4_variable_encoding, ) from xarray.backends.pydap_ import PydapDataStore from xarray.backends.scipy_ import ScipyBackendEntrypoint from xarray.coding.variables import SerializationWarning from xarray.conventions import encode_dataset_coordinates from xarray.core import indexing from xarray.core.options import set_options from xarray.core.pycompat import dask_array_type from xarray.tests import LooseVersion, mock from . import ( arm_xfail, assert_allclose, assert_array_equal, assert_equal, assert_identical, has_dask, has_netCDF4, has_scipy, network, requires_cfgrib, requires_cftime, requires_dask, requires_fsspec, requires_h5netcdf, requires_iris, requires_netCDF4, requires_pseudonetcdf, requires_pydap, requires_pynio, requires_rasterio, requires_scipy, requires_scipy_or_netCDF4, requires_zarr, ) from .test_coding_times import ( _ALL_CALENDARS, _NON_STANDARD_CALENDARS, _STANDARD_CALENDARS, ) from .test_dataset import create_append_test_data, create_test_data try: import netCDF4 as nc4 except ImportError: pass try: import dask import dask.array as da except ImportError: pass ON_WINDOWS = sys.platform == "win32" default_value = object() def open_example_dataset(name, *args, **kwargs): return open_dataset( os.path.join(os.path.dirname(__file__), "data", name), *args, **kwargs ) def open_example_mfdataset(names, *args, **kwargs): return open_mfdataset( [os.path.join(os.path.dirname(__file__), "data", name) for name in names], *args, **kwargs, ) def create_masked_and_scaled_data(): x = np.array([np.nan, np.nan, 10, 10.1, 10.2], dtype=np.float32) encoding = { "_FillValue": -1, "add_offset": 10, "scale_factor": np.float32(0.1), "dtype": "i2", } return Dataset({"x": ("t", x, {}, encoding)}) def create_encoded_masked_and_scaled_data(): attributes = {"_FillValue": -1, "add_offset": 10, "scale_factor": np.float32(0.1)} return Dataset({"x": ("t", np.int16([-1, -1, 0, 1, 2]), attributes)}) def create_unsigned_masked_scaled_data(): encoding = { "_FillValue": 255, "_Unsigned": "true", "dtype": "i1", "add_offset": 10, "scale_factor": np.float32(0.1), } x = np.array([10.0, 10.1, 22.7, 22.8, np.nan], dtype=np.float32) return Dataset({"x": ("t", x, {}, encoding)}) def create_encoded_unsigned_masked_scaled_data(): # These are values as written to the file: the _FillValue will # be represented in the signed form. attributes = { "_FillValue": -1, "_Unsigned": "true", "add_offset": 10, "scale_factor": np.float32(0.1), } # Create unsigned data corresponding to [0, 1, 127, 128, 255] unsigned sb = np.asarray([0, 1, 127, -128, -1], dtype="i1") return Dataset({"x": ("t", sb, attributes)}) def create_bad_unsigned_masked_scaled_data(): encoding = { "_FillValue": 255, "_Unsigned": True, "dtype": "i1", "add_offset": 10, "scale_factor": np.float32(0.1), } x = np.array([10.0, 10.1, 22.7, 22.8, np.nan], dtype=np.float32) return Dataset({"x": ("t", x, {}, encoding)}) def create_bad_encoded_unsigned_masked_scaled_data(): # These are values as written to the file: the _FillValue will # be represented in the signed form. attributes = { "_FillValue": -1, "_Unsigned": True, "add_offset": 10, "scale_factor": np.float32(0.1), } # Create signed data corresponding to [0, 1, 127, 128, 255] unsigned sb = np.asarray([0, 1, 127, -128, -1], dtype="i1") return Dataset({"x": ("t", sb, attributes)}) def create_signed_masked_scaled_data(): encoding = { "_FillValue": -127, "_Unsigned": "false", "dtype": "i1", "add_offset": 10, "scale_factor": np.float32(0.1), } x = np.array([-1.0, 10.1, 22.7, np.nan], dtype=np.float32) return Dataset({"x": ("t", x, {}, encoding)}) def create_encoded_signed_masked_scaled_data(): # These are values as written to the file: the _FillValue will # be represented in the signed form. attributes = { "_FillValue": -127, "_Unsigned": "false", "add_offset": 10, "scale_factor": np.float32(0.1), } # Create signed data corresponding to [0, 1, 127, 128, 255] unsigned sb = np.asarray([-110, 1, 127, -127], dtype="i1") return Dataset({"x": ("t", sb, attributes)}) def create_boolean_data(): attributes = {"units": "-"} return Dataset({"x": ("t", [True, False, False, True], attributes)}) class TestCommon: def test_robust_getitem(self): class UnreliableArrayFailure(Exception): pass class UnreliableArray: def __init__(self, array, failures=1): self.array = array self.failures = failures def __getitem__(self, key): if self.failures > 0: self.failures -= 1 raise UnreliableArrayFailure return self.array[key] array = UnreliableArray([0]) with pytest.raises(UnreliableArrayFailure): array[0] assert array[0] == 0 actual = robust_getitem(array, 0, catch=UnreliableArrayFailure, initial_delay=0) assert actual == 0 class NetCDF3Only: netcdf3_formats = ("NETCDF3_CLASSIC", "NETCDF3_64BIT") @requires_scipy def test_dtype_coercion_error(self): """Failing dtype coercion should lead to an error""" for dtype, format in itertools.product( _nc3_dtype_coercions, self.netcdf3_formats ): if dtype == "bool": # coerced upcast (bool to int8) ==> can never fail continue # Using the largest representable value, create some data that will # no longer compare equal after the coerced downcast maxval = np.iinfo(dtype).max x = np.array([0, 1, 2, maxval], dtype=dtype) ds = Dataset({"x": ("t", x, {})}) with create_tmp_file(allow_cleanup_failure=False) as path: with pytest.raises(ValueError, match="could not safely cast"): ds.to_netcdf(path, format=format) class DatasetIOBase: engine: Optional[str] = None file_format: Optional[str] = None def create_store(self): raise NotImplementedError() @contextlib.contextmanager def roundtrip( self, data, save_kwargs=None, open_kwargs=None, allow_cleanup_failure=False ): if save_kwargs is None: save_kwargs = {} if open_kwargs is None: open_kwargs = {} with create_tmp_file(allow_cleanup_failure=allow_cleanup_failure) as path: self.save(data, path, **save_kwargs) with self.open(path, **open_kwargs) as ds: yield ds @contextlib.contextmanager def roundtrip_append( self, data, save_kwargs=None, open_kwargs=None, allow_cleanup_failure=False ): if save_kwargs is None: save_kwargs = {} if open_kwargs is None: open_kwargs = {} with create_tmp_file(allow_cleanup_failure=allow_cleanup_failure) as path: for i, key in enumerate(data.variables): mode = "a" if i > 0 else "w" self.save(data[[key]], path, mode=mode, **save_kwargs) with self.open(path, **open_kwargs) as ds: yield ds # The save/open methods may be overwritten below def save(self, dataset, path, **kwargs): return dataset.to_netcdf( path, engine=self.engine, format=self.file_format, **kwargs ) @contextlib.contextmanager def open(self, path, **kwargs): with open_dataset(path, engine=self.engine, **kwargs) as ds: yield ds def test_zero_dimensional_variable(self): expected = create_test_data() expected["float_var"] = ([], 1.0e9, {"units": "units of awesome"}) expected["bytes_var"] = ([], b"foobar") expected["string_var"] = ([], "foobar") with self.roundtrip(expected) as actual: assert_identical(expected, actual) def test_write_store(self): expected = create_test_data() with self.create_store() as store: expected.dump_to_store(store) # we need to cf decode the store because it has time and # non-dimension coordinates with xr.decode_cf(store) as actual: assert_allclose(expected, actual) def check_dtypes_roundtripped(self, expected, actual): for k in expected.variables: expected_dtype = expected.variables[k].dtype # For NetCDF3, the backend should perform dtype coercion if ( isinstance(self, NetCDF3Only) and str(expected_dtype) in _nc3_dtype_coercions ): expected_dtype = np.dtype(_nc3_dtype_coercions[str(expected_dtype)]) actual_dtype = actual.variables[k].dtype # TODO: check expected behavior for string dtypes more carefully string_kinds = {"O", "S", "U"} assert expected_dtype == actual_dtype or ( expected_dtype.kind in string_kinds and actual_dtype.kind in string_kinds ) def test_roundtrip_test_data(self): expected = create_test_data() with self.roundtrip(expected) as actual: self.check_dtypes_roundtripped(expected, actual) assert_identical(expected, actual) def test_load(self): expected = create_test_data() @contextlib.contextmanager def assert_loads(vars=None): if vars is None: vars = expected with self.roundtrip(expected) as actual: for k, v in actual.variables.items(): # IndexVariables are eagerly loaded into memory assert v._in_memory == (k in actual.dims) yield actual for k, v in actual.variables.items(): if k in vars: assert v._in_memory assert_identical(expected, actual) with pytest.raises(AssertionError): # make sure the contextmanager works! with assert_loads() as ds: pass with assert_loads() as ds: ds.load() with assert_loads(["var1", "dim1", "dim2"]) as ds: ds["var1"].load() # verify we can read data even after closing the file with self.roundtrip(expected) as ds: actual = ds.load() assert_identical(expected, actual) def test_dataset_compute(self): expected = create_test_data() with self.roundtrip(expected) as actual: # Test Dataset.compute() for k, v in actual.variables.items(): # IndexVariables are eagerly cached assert v._in_memory == (k in actual.dims) computed = actual.compute() for k, v in actual.variables.items(): assert v._in_memory == (k in actual.dims) for v in computed.variables.values(): assert v._in_memory assert_identical(expected, actual) assert_identical(expected, computed) def test_pickle(self): if not has_dask: pytest.xfail("pickling requires dask for SerializableLock") expected = Dataset({"foo": ("x", [42])}) with self.roundtrip(expected, allow_cleanup_failure=ON_WINDOWS) as roundtripped: with roundtripped: # Windows doesn't like reopening an already open file raw_pickle = pickle.dumps(roundtripped) with pickle.loads(raw_pickle) as unpickled_ds: assert_identical(expected, unpickled_ds) @pytest.mark.filterwarnings("ignore:deallocating CachingFileManager") def test_pickle_dataarray(self): if not has_dask: pytest.xfail("pickling requires dask for SerializableLock") expected = Dataset({"foo": ("x", [42])}) with self.roundtrip(expected, allow_cleanup_failure=ON_WINDOWS) as roundtripped: with roundtripped: raw_pickle = pickle.dumps(roundtripped["foo"]) # TODO: figure out how to explicitly close the file for the # unpickled DataArray? unpickled = pickle.loads(raw_pickle) assert_identical(expected["foo"], unpickled) def test_dataset_caching(self): expected = Dataset({"foo": ("x", [5, 6, 7])}) with self.roundtrip(expected) as actual: assert isinstance(actual.foo.variable._data, indexing.MemoryCachedArray) assert not actual.foo.variable._in_memory actual.foo.values # cache assert actual.foo.variable._in_memory with self.roundtrip(expected, open_kwargs={"cache": False}) as actual: assert isinstance(actual.foo.variable._data, indexing.CopyOnWriteArray) assert not actual.foo.variable._in_memory actual.foo.values # no caching assert not actual.foo.variable._in_memory def test_roundtrip_None_variable(self): expected = Dataset({None: (("x", "y"), [[0, 1], [2, 3]])}) with self.roundtrip(expected) as actual: assert_identical(expected, actual) def test_roundtrip_object_dtype(self): floats = np.array([0.0, 0.0, 1.0, 2.0, 3.0], dtype=object) floats_nans = np.array([np.nan, np.nan, 1.0, 2.0, 3.0], dtype=object) bytes_ = np.array([b"ab", b"cdef", b"g"], dtype=object) bytes_nans = np.array([b"ab", b"cdef", np.nan], dtype=object) strings = np.array(["ab", "cdef", "g"], dtype=object) strings_nans = np.array(["ab", "cdef", np.nan], dtype=object) all_nans = np.array([np.nan, np.nan], dtype=object) original = Dataset( { "floats": ("a", floats), "floats_nans": ("a", floats_nans), "bytes": ("b", bytes_), "bytes_nans": ("b", bytes_nans), "strings": ("b", strings), "strings_nans": ("b", strings_nans), "all_nans": ("c", all_nans), "nan": ([], np.nan), } ) expected = original.copy(deep=True) with self.roundtrip(original) as actual: try: assert_identical(expected, actual) except AssertionError: # Most stores use '' for nans in strings, but some don't. # First try the ideal case (where the store returns exactly) # the original Dataset), then try a more realistic case. # This currently includes all netCDF files when encoding is not # explicitly set. # https://github.com/pydata/xarray/issues/1647 expected["bytes_nans"][-1] = b"" expected["strings_nans"][-1] = "" assert_identical(expected, actual) def test_roundtrip_string_data(self): expected = Dataset({"x": ("t", ["ab", "cdef"])}) with self.roundtrip(expected) as actual: assert_identical(expected, actual) def test_roundtrip_string_encoded_characters(self): expected = Dataset({"x": ("t", ["ab", "cdef"])}) expected["x"].encoding["dtype"] = "S1" with self.roundtrip(expected) as actual: assert_identical(expected, actual) assert actual["x"].encoding["_Encoding"] == "utf-8" expected["x"].encoding["_Encoding"] = "ascii" with self.roundtrip(expected) as actual: assert_identical(expected, actual) assert actual["x"].encoding["_Encoding"] == "ascii" @arm_xfail def test_roundtrip_numpy_datetime_data(self): times = pd.to_datetime(["2000-01-01", "2000-01-02", "NaT"]) expected = Dataset({"t": ("t", times), "t0": times[0]}) kwargs = {"encoding": {"t0": {"units": "days since 1950-01-01"}}} with self.roundtrip(expected, save_kwargs=kwargs) as actual: assert_identical(expected, actual) assert actual.t0.encoding["units"] == "days since 1950-01-01" @requires_cftime def test_roundtrip_cftime_datetime_data(self): from .test_coding_times import _all_cftime_date_types date_types = _all_cftime_date_types() for date_type in date_types.values(): times = [date_type(1, 1, 1), date_type(1, 1, 2)] expected = Dataset({"t": ("t", times), "t0": times[0]}) kwargs = {"encoding": {"t0": {"units": "days since 0001-01-01"}}} expected_decoded_t = np.array(times) expected_decoded_t0 = np.array([date_type(1, 1, 1)]) expected_calendar = times[0].calendar with warnings.catch_warnings(): if expected_calendar in {"proleptic_gregorian", "gregorian"}: warnings.filterwarnings("ignore", "Unable to decode time axis") with self.roundtrip(expected, save_kwargs=kwargs) as actual: abs_diff = abs(actual.t.values - expected_decoded_t) assert (abs_diff <= np.timedelta64(1, "s")).all() assert ( actual.t.encoding["units"] == "days since 0001-01-01 00:00:00.000000" ) assert actual.t.encoding["calendar"] == expected_calendar abs_diff = abs(actual.t0.values - expected_decoded_t0) assert (abs_diff <= np.timedelta64(1, "s")).all() assert actual.t0.encoding["units"] == "days since 0001-01-01" assert actual.t.encoding["calendar"] == expected_calendar def test_roundtrip_timedelta_data(self): time_deltas = pd.to_timedelta(["1h", "2h", "NaT"]) expected = Dataset({"td": ("td", time_deltas), "td0": time_deltas[0]}) with self.roundtrip(expected) as actual: assert_identical(expected, actual) def test_roundtrip_float64_data(self): expected = Dataset({"x": ("y", np.array([1.0, 2.0, np.pi], dtype="float64"))}) with self.roundtrip(expected) as actual: assert_identical(expected, actual) def test_roundtrip_example_1_netcdf(self): with open_example_dataset("example_1.nc") as expected: with self.roundtrip(expected) as actual: # we allow the attributes to differ since that # will depend on the encoding used. For example, # without CF encoding 'actual' will end up with # a dtype attribute. assert_equal(expected, actual) def test_roundtrip_coordinates(self): original = Dataset( {"foo": ("x", [0, 1])}, {"x": [2, 3], "y": ("a", [42]), "z": ("x", [4, 5])} ) with self.roundtrip(original) as actual: assert_identical(original, actual) original["foo"].encoding["coordinates"] = "y" with self.roundtrip(original, open_kwargs={"decode_coords": False}) as expected: # check roundtripping when decode_coords=False with self.roundtrip( expected, open_kwargs={"decode_coords": False} ) as actual: assert_identical(expected, actual) def test_roundtrip_global_coordinates(self): original = Dataset( {"foo": ("x", [0, 1])}, {"x": [2, 3], "y": ("a", [42]), "z": ("x", [4, 5])} ) with self.roundtrip(original) as actual: assert_identical(original, actual) # test that global "coordinates" is as expected _, attrs = encode_dataset_coordinates(original) assert attrs["coordinates"] == "y" # test warning when global "coordinates" is already set original.attrs["coordinates"] = "foo" with pytest.warns(SerializationWarning): _, attrs = encode_dataset_coordinates(original) assert attrs["coordinates"] == "foo" def test_roundtrip_coordinates_with_space(self): original = Dataset(coords={"x": 0, "y z": 1}) expected = Dataset({"y z": 1}, {"x": 0}) with pytest.warns(SerializationWarning): with self.roundtrip(original) as actual: assert_identical(expected, actual) def test_roundtrip_boolean_dtype(self): original = create_boolean_data() assert original["x"].dtype == "bool" with self.roundtrip(original) as actual: assert_identical(original, actual) assert actual["x"].dtype == "bool" def test_orthogonal_indexing(self): in_memory = create_test_data() with self.roundtrip(in_memory) as on_disk: indexers = {"dim1": [1, 2, 0], "dim2": [3, 2, 0, 3], "dim3": np.arange(5)} expected = in_memory.isel(**indexers) actual = on_disk.isel(**indexers) # make sure the array is not yet loaded into memory assert not actual["var1"].variable._in_memory assert_identical(expected, actual) # do it twice, to make sure we're switched from orthogonal -> numpy # when we cached the values actual = on_disk.isel(**indexers) assert_identical(expected, actual) def test_vectorized_indexing(self): in_memory = create_test_data() with self.roundtrip(in_memory) as on_disk: indexers = { "dim1": DataArray([0, 2, 0], dims="a"), "dim2": DataArray([0, 2, 3], dims="a"), } expected = in_memory.isel(**indexers) actual = on_disk.isel(**indexers) # make sure the array is not yet loaded into memory assert not actual["var1"].variable._in_memory assert_identical(expected, actual.load()) # do it twice, to make sure we're switched from # vectorized -> numpy when we cached the values actual = on_disk.isel(**indexers) assert_identical(expected, actual) def multiple_indexing(indexers): # make sure a sequence of lazy indexings certainly works. with self.roundtrip(in_memory) as on_disk: actual = on_disk["var3"] expected = in_memory["var3"] for ind in indexers: actual = actual.isel(**ind) expected = expected.isel(**ind) # make sure the array is not yet loaded into memory assert not actual.variable._in_memory assert_identical(expected, actual.load()) # two-staged vectorized-indexing indexers = [ { "dim1": DataArray([[0, 7], [2, 6], [3, 5]], dims=["a", "b"]), "dim3": DataArray([[0, 4], [1, 3], [2, 2]], dims=["a", "b"]), }, {"a": DataArray([0, 1], dims=["c"]), "b": DataArray([0, 1], dims=["c"])}, ] multiple_indexing(indexers) # vectorized-slice mixed indexers = [ { "dim1": DataArray([[0, 7], [2, 6], [3, 5]], dims=["a", "b"]), "dim3": slice(None, 10), } ] multiple_indexing(indexers) # vectorized-integer mixed indexers = [ {"dim3": 0}, {"dim1": DataArray([[0, 7], [2, 6], [3, 5]], dims=["a", "b"])}, {"a": slice(None, None, 2)}, ] multiple_indexing(indexers) # vectorized-integer mixed indexers = [ {"dim3": 0}, {"dim1": DataArray([[0, 7], [2, 6], [3, 5]], dims=["a", "b"])}, {"a": 1, "b": 0}, ] multiple_indexing(indexers) @pytest.mark.xfail( reason="zarr without dask handles negative steps in slices incorrectly", ) def test_vectorized_indexing_negative_step(self): # use dask explicitly when present if has_dask: open_kwargs = {"chunks": {}} else: open_kwargs = None in_memory = create_test_data() def multiple_indexing(indexers): # make sure a sequence of lazy indexings certainly works. with self.roundtrip(in_memory, open_kwargs=open_kwargs) as on_disk: actual = on_disk["var3"] expected = in_memory["var3"] for ind in indexers: actual = actual.isel(**ind) expected = expected.isel(**ind) # make sure the array is not yet loaded into memory assert not actual.variable._in_memory assert_identical(expected, actual.load()) # with negative step slice. indexers = [ { "dim1": DataArray([[0, 7], [2, 6], [3, 5]], dims=["a", "b"]), "dim3": slice(-1, 1, -1), } ] multiple_indexing(indexers) # with negative step slice. indexers = [ { "dim1": DataArray([[0, 7], [2, 6], [3, 5]], dims=["a", "b"]), "dim3": slice(-1, 1, -2), } ] multiple_indexing(indexers) def test_isel_dataarray(self): # Make sure isel works lazily. GH:issue:1688 in_memory = create_test_data() with self.roundtrip(in_memory) as on_disk: expected = in_memory.isel(dim2=in_memory["dim2"] < 3) actual = on_disk.isel(dim2=on_disk["dim2"] < 3) assert_identical(expected, actual) def validate_array_type(self, ds): # Make sure that only NumpyIndexingAdapter stores a bare np.ndarray. def find_and_validate_array(obj): # recursively called function. obj: array or array wrapper. if hasattr(obj, "array"): if isinstance(obj.array, indexing.ExplicitlyIndexed): find_and_validate_array(obj.array) else: if isinstance(obj.array, np.ndarray): assert isinstance(obj, indexing.NumpyIndexingAdapter) elif isinstance(obj.array, dask_array_type): assert isinstance(obj, indexing.DaskIndexingAdapter) elif isinstance(obj.array, pd.Index): assert isinstance(obj, indexing.PandasIndexingAdapter) else: raise TypeError( "{} is wrapped by {}".format(type(obj.array), type(obj)) ) for k, v in ds.variables.items(): find_and_validate_array(v._data) def test_array_type_after_indexing(self): in_memory = create_test_data() with self.roundtrip(in_memory) as on_disk: self.validate_array_type(on_disk) indexers = {"dim1": [1, 2, 0], "dim2": [3, 2, 0, 3], "dim3": np.arange(5)} expected = in_memory.isel(**indexers) actual = on_disk.isel(**indexers) assert_identical(expected, actual) self.validate_array_type(actual) # do it twice, to make sure we're switched from orthogonal -> numpy # when we cached the values actual = on_disk.isel(**indexers) assert_identical(expected, actual) self.validate_array_type(actual) def test_dropna(self): # regression test for GH:issue:1694 a = np.random.randn(4, 3) a[1, 1] = np.NaN in_memory = xr.Dataset( {"a": (("y", "x"), a)}, coords={"y": np.arange(4), "x": np.arange(3)} ) assert_identical( in_memory.dropna(dim="x"), in_memory.isel(x=slice(None, None, 2)) ) with self.roundtrip(in_memory) as on_disk: self.validate_array_type(on_disk) expected = in_memory.dropna(dim="x") actual = on_disk.dropna(dim="x") assert_identical(expected, actual) def test_ondisk_after_print(self): """Make sure print does not load file into memory""" in_memory = create_test_data() with self.roundtrip(in_memory) as on_disk: repr(on_disk) assert not on_disk["var1"]._in_memory class CFEncodedBase(DatasetIOBase): def test_roundtrip_bytes_with_fill_value(self): values = np.array([b"ab", b"cdef", np.nan], dtype=object) encoding = {"_FillValue": b"X", "dtype": "S1"} original = Dataset({"x": ("t", values, {}, encoding)}) expected = original.copy(deep=True) with self.roundtrip(original) as actual: assert_identical(expected, actual) original = Dataset({"x": ("t", values, {}, {"_FillValue": b""})}) with self.roundtrip(original) as actual: assert_identical(expected, actual) def test_roundtrip_string_with_fill_value_nchar(self): values = np.array(["ab", "cdef", np.nan], dtype=object) expected = Dataset({"x": ("t", values)}) encoding = {"dtype": "S1", "_FillValue": b"X"} original = Dataset({"x": ("t", values, {}, encoding)}) # Not supported yet. with pytest.raises(NotImplementedError): with self.roundtrip(original) as actual: assert_identical(expected, actual) @pytest.mark.parametrize( "decoded_fn, encoded_fn", [ ( create_unsigned_masked_scaled_data, create_encoded_unsigned_masked_scaled_data, ), pytest.param( create_bad_unsigned_masked_scaled_data, create_bad_encoded_unsigned_masked_scaled_data, marks=pytest.mark.xfail(reason="Bad _Unsigned attribute."), ), ( create_signed_masked_scaled_data, create_encoded_signed_masked_scaled_data, ), (create_masked_and_scaled_data, create_encoded_masked_and_scaled_data), ], ) def test_roundtrip_mask_and_scale(self, decoded_fn, encoded_fn): decoded = decoded_fn() encoded = encoded_fn() with self.roundtrip(decoded) as actual: for k in decoded.variables: assert decoded.variables[k].dtype == actual.variables[k].dtype assert_allclose(decoded, actual, decode_bytes=False) with self.roundtrip(decoded, open_kwargs=dict(decode_cf=False)) as actual: # TODO: this assumes that all roundtrips will first # encode. Is that something we want to test for? for k in encoded.variables: assert encoded.variables[k].dtype == actual.variables[k].dtype assert_allclose(encoded, actual, decode_bytes=False) with self.roundtrip(encoded, open_kwargs=dict(decode_cf=False)) as actual: for k in encoded.variables: assert encoded.variables[k].dtype == actual.variables[k].dtype assert_allclose(encoded, actual, decode_bytes=False) # make sure roundtrip encoding didn't change the # original dataset. assert_allclose(encoded, encoded_fn(), decode_bytes=False) with self.roundtrip(encoded) as actual: for k in decoded.variables: assert decoded.variables[k].dtype == actual.variables[k].dtype assert_allclose(decoded, actual, decode_bytes=False) @staticmethod def _create_cf_dataset(): original = Dataset( dict( variable=( ("ln_p", "latitude", "longitude"), np.arange(8, dtype="f4").reshape(2, 2, 2), {"ancillary_variables": "std_devs det_lim"}, ), std_devs=( ("ln_p", "latitude", "longitude"), np.arange(0.1, 0.9, 0.1).reshape(2, 2, 2), {"standard_name": "standard_error"}, ), det_lim=( (), 0.1, {"standard_name": "detection_minimum"}, ), ), dict( latitude=("latitude", [0, 1], {"units": "degrees_north"}), longitude=("longitude", [0, 1], {"units": "degrees_east"}), latlon=((), -1, {"grid_mapping_name": "latitude_longitude"}), latitude_bnds=(("latitude", "bnds2"), [[0, 1], [1, 2]]), longitude_bnds=(("longitude", "bnds2"), [[0, 1], [1, 2]]), areas=( ("latitude", "longitude"), [[1, 1], [1, 1]], {"units": "degree^2"}, ), ln_p=( "ln_p", [1.0, 0.5], { "standard_name": "atmosphere_ln_pressure_coordinate", "computed_standard_name": "air_pressure", }, ), P0=((), 1013.25, {"units": "hPa"}), ), ) original["variable"].encoding.update( {"cell_measures": "area: areas", "grid_mapping": "latlon"}, ) original.coords["latitude"].encoding.update( dict(grid_mapping="latlon", bounds="latitude_bnds") ) original.coords["longitude"].encoding.update( dict(grid_mapping="latlon", bounds="longitude_bnds") ) original.coords["ln_p"].encoding.update({"formula_terms": "p0: P0 lev : ln_p"}) return original def test_grid_mapping_and_bounds_are_not_coordinates_in_file(self): original = self._create_cf_dataset() with create_tmp_file() as tmp_file: original.to_netcdf(tmp_file) with open_dataset(tmp_file, decode_coords=False) as ds: assert ds.coords["latitude"].attrs["bounds"] == "latitude_bnds" assert ds.coords["longitude"].attrs["bounds"] == "longitude_bnds" assert "coordinates" not in ds["variable"].attrs assert "coordinates" not in ds.attrs def test_coordinate_variables_after_dataset_roundtrip(self): original = self._create_cf_dataset() with self.roundtrip(original, open_kwargs={"decode_coords": "all"}) as actual: assert_identical(actual, original) with self.roundtrip(original) as actual: expected = original.reset_coords( ["latitude_bnds", "longitude_bnds", "areas", "P0", "latlon"] ) # equal checks that coords and data_vars are equal which # should be enough # identical would require resetting a number of attributes # skip that. assert_equal(actual, expected) def test_grid_mapping_and_bounds_are_coordinates_after_dataarray_roundtrip(self): original = self._create_cf_dataset() # The DataArray roundtrip should have the same warnings as the # Dataset, but we already tested for those, so just go for the # new warnings. It would appear that there is no way to tell # pytest "This warning and also this warning should both be # present". # xarray/tests/test_conventions.py::TestCFEncodedDataStore # needs the to_dataset. The other backends should be fine # without it. with pytest.warns( UserWarning, match=( r"Variable\(s\) referenced in bounds not in variables: " r"\['l(at|ong)itude_bnds'\]" ), ): with self.roundtrip( original["variable"].to_dataset(), open_kwargs={"decode_coords": "all"} ) as actual: assert_identical(actual, original["variable"].to_dataset()) @requires_iris def test_coordinate_variables_after_iris_roundtrip(self): original = self._create_cf_dataset() iris_cube = original["variable"].to_iris() actual = DataArray.from_iris(iris_cube) # Bounds will be missing (xfail) del original.coords["latitude_bnds"], original.coords["longitude_bnds"] # Ancillary vars will be missing # Those are data_vars, and will be dropped when grabbing the variable assert_identical(actual, original["variable"]) def test_coordinates_encoding(self): def equals_latlon(obj): return obj == "lat lon" or obj == "lon lat" original = Dataset( {"temp": ("x", [0, 1]), "precip": ("x", [0, -1])}, {"lat": ("x", [2, 3]), "lon": ("x", [4, 5])}, ) with self.roundtrip(original) as actual: assert_identical(actual, original) with create_tmp_file() as tmp_file: original.to_netcdf(tmp_file) with open_dataset(tmp_file, decode_coords=False) as ds: assert equals_latlon(ds["temp"].attrs["coordinates"]) assert equals_latlon(ds["precip"].attrs["coordinates"]) assert "coordinates" not in ds.attrs assert "coordinates" not in ds["lat"].attrs assert "coordinates" not in ds["lon"].attrs modified = original.drop_vars(["temp", "precip"]) with self.roundtrip(modified) as actual: assert_identical(actual, modified) with create_tmp_file() as tmp_file: modified.to_netcdf(tmp_file) with open_dataset(tmp_file, decode_coords=False) as ds: assert equals_latlon(ds.attrs["coordinates"]) assert "coordinates" not in ds["lat"].attrs assert "coordinates" not in ds["lon"].attrs original["temp"].encoding["coordinates"] = "lat" with self.roundtrip(original) as actual: assert_identical(actual, original) original["precip"].encoding["coordinates"] = "lat" with create_tmp_file() as tmp_file: original.to_netcdf(tmp_file) with open_dataset(tmp_file, decode_coords=True) as ds: assert "lon" not in ds["temp"].encoding["coordinates"] assert "lon" not in ds["precip"].encoding["coordinates"] assert "coordinates" not in ds["lat"].encoding assert "coordinates" not in ds["lon"].encoding def test_roundtrip_endian(self): ds = Dataset( { "x": np.arange(3, 10, dtype=">i2"), "y": np.arange(3, 20, dtype="