# Licensed under a 3-clause BSD style license - see LICENSE.rst import pytest import numpy as np from astropy.table import Table, QTable, NdarrayMixin, Column from astropy.table.table_helpers import simple_table from astropy import units as u from astropy.coordinates import (SkyCoord, Latitude, Longitude, Angle, EarthLocation, SphericalRepresentation, CartesianRepresentation, SphericalCosLatDifferential) from astropy.time import Time, TimeDelta from astropy.units import allclose as quantity_allclose from astropy.units.quantity import QuantityInfo from astropy.utils.exceptions import AstropyUserWarning from astropy.utils.data import get_pkg_data_filename from astropy.utils.misc import _NOT_OVERWRITING_MSG_MATCH from astropy.io.misc.hdf5 import meta_path from astropy.utils.compat.optional_deps import HAS_H5PY # noqa if HAS_H5PY: import h5py ALL_DTYPES = [np.uint8, np.uint16, np.uint32, np.uint64, np.int8, np.int16, np.int32, np.int64, np.float32, np.float64, np.bool_, '|S3'] def _default_values(dtype): if dtype == np.bool_: return [0, 1, 1] elif dtype == '|S3': return [b'abc', b'def', b'ghi'] else: return [1, 2, 3] @pytest.mark.skipif('not HAS_H5PY') def test_write_nopath(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) with pytest.warns(UserWarning, match="table path was not set via the path= argument"): t1.write(test_file) t1 = Table.read(test_file, path='__astropy_table__') @pytest.mark.skipif('not HAS_H5PY') def test_write_nopath_nonempty(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='bubu') with pytest.raises(ValueError) as exc: t1.write(test_file, append=True) assert 'table path should always be set via the path=' in exc.value.args[0] @pytest.mark.skipif('not HAS_H5PY') def test_read_notable_nopath(tmpdir): test_file = str(tmpdir.join('test.hdf5')) h5py.File(test_file, 'w').close() # create empty file with pytest.raises(ValueError, match='no table found in HDF5 group /'): Table.read(test_file, path='/', format='hdf5') @pytest.mark.skipif('not HAS_H5PY') def test_read_nopath(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path="the_table") t2 = Table.read(test_file) assert np.all(t1['a'] == t2['a']) @pytest.mark.skipif('not HAS_H5PY') def test_read_nopath_multi_tables(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path="the_table") t1.write(test_file, path="the_table_but_different", append=True, overwrite=True) with pytest.warns(AstropyUserWarning, match=r"path= was not specified but multiple tables"): t2 = Table.read(test_file) assert np.all(t1['a'] == t2['a']) @pytest.mark.skipif('not HAS_H5PY') def test_write_invalid_path(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) with pytest.raises(ValueError) as exc: t1.write(test_file, path='test/') assert exc.value.args[0] == "table path should end with table name, not /" @pytest.mark.skipif('not HAS_H5PY') def test_read_invalid_path(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='the_table') with pytest.raises(OSError) as exc: Table.read(test_file, path='test/') assert exc.value.args[0] == "Path test/ does not exist" @pytest.mark.skipif('not HAS_H5PY') def test_read_missing_group(tmpdir): test_file = str(tmpdir.join('test.hdf5')) h5py.File(test_file, 'w').close() # create empty file with pytest.raises(OSError) as exc: Table.read(test_file, path='test/path/table') assert exc.value.args[0] == "Path test/path/table does not exist" @pytest.mark.skipif('not HAS_H5PY') def test_read_missing_table(tmpdir): test_file = str(tmpdir.join('test.hdf5')) with h5py.File(test_file, 'w') as f: f.create_group('test').create_group('path') with pytest.raises(OSError) as exc: Table.read(test_file, path='test/path/table') assert exc.value.args[0] == "Path test/path/table does not exist" @pytest.mark.skipif('not HAS_H5PY') def test_read_missing_group_fileobj(tmpdir): test_file = str(tmpdir.join('test.hdf5')) with h5py.File(test_file, 'w') as f: with pytest.raises(OSError) as exc: Table.read(f, path='test/path/table') assert exc.value.args[0] == "Path test/path/table does not exist" @pytest.mark.skipif('not HAS_H5PY') def test_read_write_simple(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='the_table') t2 = Table.read(test_file, path='the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_write_existing_table(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='the_table') with pytest.raises(OSError) as exc: t1.write(test_file, path='the_table', append=True) assert exc.value.args[0] == "Table the_table already exists" @pytest.mark.skipif('not HAS_H5PY') def test_read_write_memory(tmpdir): with h5py.File('test', 'w', driver='core', backing_store=False) as output_file: t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(output_file, path='the_table') t2 = Table.read(output_file, path='the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_write_existing(tmpdir): test_file = str(tmpdir.join('test.hdf5')) h5py.File(test_file, 'w').close() # create empty file t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) with pytest.raises(OSError, match=_NOT_OVERWRITING_MSG_MATCH): t1.write(test_file, path='the_table') @pytest.mark.skipif('not HAS_H5PY') def test_read_write_existing_overwrite(tmpdir): test_file = str(tmpdir.join('test.hdf5')) h5py.File(test_file, 'w').close() # create empty file t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='the_table', overwrite=True) t2 = Table.read(test_file, path='the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_write_existing_append(tmpdir): test_file = str(tmpdir.join('test.hdf5')) h5py.File(test_file, 'w').close() # create empty file t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='the_table_1', append=True) t1.write(test_file, path='the_table_2', append=True) t2 = Table.read(test_file, path='the_table_1') assert np.all(t2['a'] == [1, 2, 3]) t3 = Table.read(test_file, path='the_table_2') assert np.all(t3['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_write_existing_append_groups(tmpdir): test_file = str(tmpdir.join('test.hdf5')) with h5py.File(test_file, 'w') as f: f.create_group('test_1') t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='test_1/the_table_1', append=True) t1.write(test_file, path='test_2/the_table_2', append=True) t2 = Table.read(test_file, path='test_1/the_table_1') assert np.all(t2['a'] == [1, 2, 3]) t3 = Table.read(test_file, path='test_2/the_table_2') assert np.all(t3['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_write_existing_append_overwrite(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='table1') t1.write(test_file, path='table2', append=True) t1v2 = Table() t1v2.add_column(Column(name='a', data=[4, 5, 6])) with pytest.raises(OSError) as exc: t1v2.write(test_file, path='table1', append=True) assert exc.value.args[0] == 'Table table1 already exists' t1v2.write(test_file, path='table1', append=True, overwrite=True) t2 = Table.read(test_file, path='table1') assert np.all(t2['a'] == [4, 5, 6]) t3 = Table.read(test_file, path='table2') assert np.all(t3['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_fileobj(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='the_table') import h5py with h5py.File(test_file, 'r') as input_file: t2 = Table.read(input_file, path='the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_filobj_path(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='path/to/data/the_table') import h5py with h5py.File(test_file, 'r') as input_file: t2 = Table.read(input_file, path='path/to/data/the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_filobj_group_path(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(test_file, path='path/to/data/the_table') import h5py with h5py.File(test_file, 'r') as input_file: t2 = Table.read(input_file['path/to'], path='data/the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_read_wrong_fileobj(): class FakeFile: def read(self): pass f = FakeFile() with pytest.raises(TypeError, match='h5py can only open regular files'): Table.read(f, format='hdf5') @pytest.mark.skipif('not HAS_H5PY') def test_write_fileobj(tmpdir): test_file = str(tmpdir.join('test.hdf5')) import h5py with h5py.File(test_file, 'w') as output_file: t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(output_file, path='the_table') t2 = Table.read(test_file, path='the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_write_create_dataset_kwargs(tmpdir): test_file = str(tmpdir.join('test.hdf5')) the_path = 'the_table' import h5py with h5py.File(test_file, 'w') as output_file: t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(output_file, path=the_path, maxshape=(None, )) # A roundabout way of checking this, but the table created above should be # resizable if the kwarg was passed through successfully t2 = Table() t2.add_column(Column(name='a', data=[4, 5])) with h5py.File(test_file, 'a') as output_file: output_file[the_path].resize((len(t1) + len(t2), )) output_file[the_path][len(t1):] = t2.as_array() t3 = Table.read(test_file, path='the_table') assert np.all(t3['a'] == [1, 2, 3, 4, 5]) @pytest.mark.skipif('not HAS_H5PY') def test_write_filobj_group(tmpdir): test_file = str(tmpdir.join('test.hdf5')) import h5py with h5py.File(test_file, 'w') as output_file: t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(output_file, path='path/to/data/the_table') t2 = Table.read(test_file, path='path/to/data/the_table') assert np.all(t2['a'] == [1, 2, 3]) @pytest.mark.skipif('not HAS_H5PY') def test_write_wrong_type(): t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) with pytest.raises(TypeError) as exc: t1.write(1212, path='path/to/data/the_table', format='hdf5') assert exc.value.args[0] == ('output should be a string ' 'or an h5py File or Group object') @pytest.mark.skipif('not HAS_H5PY') @pytest.mark.parametrize(('dtype'), ALL_DTYPES) def test_preserve_single_dtypes(tmpdir, dtype): test_file = str(tmpdir.join('test.hdf5')) values = _default_values(dtype) t1 = Table() t1.add_column(Column(name='a', data=np.array(values, dtype=dtype))) t1.write(test_file, path='the_table') t2 = Table.read(test_file, path='the_table') assert np.all(t2['a'] == values) assert t2['a'].dtype == dtype @pytest.mark.skipif('not HAS_H5PY') def test_preserve_all_dtypes(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() for dtype in ALL_DTYPES: values = _default_values(dtype) t1.add_column(Column(name=str(dtype), data=np.array(values, dtype=dtype))) t1.write(test_file, path='the_table') t2 = Table.read(test_file, path='the_table') for dtype in ALL_DTYPES: values = _default_values(dtype) assert np.all(t2[str(dtype)] == values) assert t2[str(dtype)].dtype == dtype @pytest.mark.skipif('not HAS_H5PY') def test_preserve_meta(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.meta['a'] = 1 t1.meta['b'] = 'hello' t1.meta['c'] = 3.14159 t1.meta['d'] = True t1.meta['e'] = np.array([1, 2, 3]) t1.write(test_file, path='the_table') t2 = Table.read(test_file, path='the_table') for key in t1.meta: assert np.all(t1.meta[key] == t2.meta[key]) @pytest.mark.skipif('not HAS_H5PY') def test_preserve_serialized(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1['a'] = Column(data=[1, 2, 3], unit="s") t1['a'].meta['a0'] = "A0" t1['a'].meta['a1'] = {"a1": [0, 1]} t1['a'].format = '7.3f' t1['a'].description = 'A column' t1.meta['b'] = 1 t1.meta['c'] = {"c0": [0, 1]} t1.write(test_file, path='the_table', serialize_meta=True, overwrite=True) t2 = Table.read(test_file, path='the_table') assert t1['a'].unit == t2['a'].unit assert t1['a'].format == t2['a'].format assert t1['a'].description == t2['a'].description assert t1['a'].meta == t2['a'].meta assert t1.meta == t2.meta # Check that the meta table is fixed-width bytes (see #11299) h5 = h5py.File(test_file, 'r') meta_lines = h5[meta_path('the_table')] assert meta_lines.dtype.kind == 'S' @pytest.mark.skipif('not HAS_H5PY') def test_preserve_serialized_old_meta_format(tmpdir): """Test the old meta format Only for some files created prior to v4.0, in compatibility mode. """ test_file = get_pkg_data_filename('data/old_meta_example.hdf5') t1 = Table() t1['a'] = Column(data=[1, 2, 3], unit="s") t1['a'].meta['a0'] = "A0" t1['a'].meta['a1'] = {"a1": [0, 1]} t1['a'].format = '7.3f' t1['a'].description = 'A column' t1.meta['b'] = 1 t1.meta['c'] = {"c0": [0, 1]} t2 = Table.read(test_file, path='the_table') assert t1['a'].unit == t2['a'].unit assert t1['a'].format == t2['a'].format assert t1['a'].description == t2['a'].description assert t1['a'].meta == t2['a'].meta assert t1.meta == t2.meta @pytest.mark.skipif('not HAS_H5PY') def test_preserve_serialized_in_complicated_path(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1['a'] = Column(data=[1, 2, 3], unit="s") t1['a'].meta['a0'] = "A0" t1['a'].meta['a1'] = {"a1": [0, 1]} t1['a'].format = '7.3f' t1['a'].description = 'A column' t1.meta['b'] = 1 t1.meta['c'] = {"c0": [0, 1]} t1.write(test_file, path='the_table/complicated/path', serialize_meta=True, overwrite=True) t2 = Table.read(test_file, path='the_table/complicated/path') assert t1['a'].format == t2['a'].format assert t1['a'].unit == t2['a'].unit assert t1['a'].description == t2['a'].description assert t1['a'].meta == t2['a'].meta assert t1.meta == t2.meta @pytest.mark.skipif('not HAS_H5PY') def test_metadata_very_large(tmpdir): """Test that very large datasets work, now!""" test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1['a'] = Column(data=[1, 2, 3], unit="s") t1['a'].meta['a0'] = "A0" t1['a'].meta['a1'] = {"a1": [0, 1]} t1['a'].format = '7.3f' t1['a'].description = 'A column' t1.meta['b'] = 1 t1.meta['c'] = {"c0": [0, 1]} t1.meta["meta_big"] = "0" * (2 ** 16 + 1) t1.meta["meta_biggerstill"] = "0" * (2 ** 18) t1.write(test_file, path='the_table', serialize_meta=True, overwrite=True) t2 = Table.read(test_file, path='the_table') assert t1['a'].unit == t2['a'].unit assert t1['a'].format == t2['a'].format assert t1['a'].description == t2['a'].description assert t1['a'].meta == t2['a'].meta assert t1.meta == t2.meta @pytest.mark.skipif('not HAS_H5PY') def test_skip_meta(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.meta['a'] = 1 t1.meta['b'] = 'hello' t1.meta['c'] = 3.14159 t1.meta['d'] = True t1.meta['e'] = np.array([1, 2, 3]) t1.meta['f'] = str wtext = f"Attribute `f` of type {type(t1.meta['f'])} cannot be written to HDF5 files - skipping" with pytest.warns(AstropyUserWarning, match=wtext) as w: t1.write(test_file, path='the_table') assert len(w) == 1 @pytest.mark.skipif('not HAS_H5PY') def test_fail_meta_serialize(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.meta['f'] = str with pytest.raises(Exception) as err: t1.write(test_file, path='the_table', serialize_meta=True) assert "cannot represent an object" in str(err.value) assert "" in str(err.value) @pytest.mark.skipif('not HAS_H5PY') def test_read_h5py_objects(tmpdir): # Regression test - ensure that Datasets are recognized automatically test_file = str(tmpdir.join('test.hdf5')) import h5py with h5py.File(test_file, 'w') as output_file: t1 = Table() t1.add_column(Column(name='a', data=[1, 2, 3])) t1.write(output_file, path='the_table') f = h5py.File(test_file, mode='r') t2 = Table.read(f, path='the_table') assert np.all(t2['a'] == [1, 2, 3]) t3 = Table.read(f['/'], path='the_table') assert np.all(t3['a'] == [1, 2, 3]) t4 = Table.read(f['the_table']) assert np.all(t4['a'] == [1, 2, 3]) f.close() # don't raise an error in 'test --open-files' @pytest.mark.skipif('not HAS_H5PY') def test_read_write_unicode_to_hdf5(tmpdir): test_file = str(tmpdir.join('test.hdf5')) t = Table() t['p'] = ['a', 'b', 'c'] t['q'] = [1, 2, 3] t['r'] = [b'a', b'b', b'c'] t['s'] = ["\u2119", "\u01b4", "\u2602"] t.write(test_file, path='the_table', overwrite=True) t1 = Table.read(test_file, path='the_table', character_as_bytes=False) for col, col1 in zip(t.itercols(), t1.itercols()): assert np.all(col == col1) assert np.all(t1['p'].info.dtype.kind == "U") assert np.all(t1['q'].info.dtype.kind == "i") assert np.all(t1['r'].info.dtype.kind == "U") assert np.all(t1['s'].info.dtype.kind == "U") # Test default (character_as_bytes=True) t2 = Table.read(test_file, path='the_table') for col, col1 in zip(t.itercols(), t2.itercols()): assert np.all(col == col1) assert np.all(t2['p'].info.dtype.kind == "S") assert np.all(t2['q'].info.dtype.kind == "i") assert np.all(t2['r'].info.dtype.kind == "S") assert np.all(t2['s'].info.dtype.kind == "S") def assert_objects_equal(obj1, obj2, attrs, compare_class=True): if compare_class: assert obj1.__class__ is obj2.__class__ info_attrs = ['info.name', 'info.format', 'info.unit', 'info.description', 'info.meta'] for attr in attrs + info_attrs: a1 = obj1 a2 = obj2 for subattr in attr.split('.'): try: a1 = getattr(a1, subattr) a2 = getattr(a2, subattr) except AttributeError: a1 = a1[subattr] a2 = a2[subattr] # Mixin info.meta can None instead of empty OrderedDict(), #6720 would # fix this. if attr == 'info.meta': if a1 is None: a1 = {} if a2 is None: a2 = {} if isinstance(a1, np.ndarray) and a1.dtype.kind == 'f': assert quantity_allclose(a1, a2, rtol=1e-15) else: assert np.all(a1 == a2) # Testing HDF5 table read/write with mixins. This is mostly # copied from FITS mixin testing, and it might be good to unify it. # Analogous tests also exist for ECSV. el = EarthLocation(x=1 * u.km, y=3 * u.km, z=5 * u.km) el2 = EarthLocation(x=[1, 2] * u.km, y=[3, 4] * u.km, z=[5, 6] * u.km) sr = SphericalRepresentation( [0, 1]*u.deg, [2, 3]*u.deg, 1*u.kpc) cr = CartesianRepresentation( [0, 1]*u.pc, [4, 5]*u.pc, [8, 6]*u.pc) sd = SphericalCosLatDifferential( [0, 1]*u.mas/u.yr, [0, 1]*u.mas/u.yr, 10*u.km/u.s) srd = SphericalRepresentation(sr, differentials=sd) sc = SkyCoord([1, 2], [3, 4], unit='deg,deg', frame='fk4', obstime='J1990.5') scd = SkyCoord([1, 2], [3, 4], [5, 6], unit='deg,deg,m', frame='fk4', obstime=['J1990.5', 'J1991.5']) scdc = scd.copy() scdc.representation_type = 'cartesian' scpm = SkyCoord([1, 2], [3, 4], [5, 6], unit='deg,deg,pc', pm_ra_cosdec=[7, 8]*u.mas/u.yr, pm_dec=[9, 10]*u.mas/u.yr) scpmrv = SkyCoord([1, 2], [3, 4], [5, 6], unit='deg,deg,pc', pm_ra_cosdec=[7, 8]*u.mas/u.yr, pm_dec=[9, 10]*u.mas/u.yr, radial_velocity=[11, 12]*u.km/u.s) scrv = SkyCoord([1, 2], [3, 4], [5, 6], unit='deg,deg,pc', radial_velocity=[11, 12]*u.km/u.s) tm = Time([2450814.5, 2450815.5], format='jd', scale='tai', location=el) # NOTE: in the test below the name of the column "x" for the Quantity is # important since it tests the fix for #10215 (namespace clash, where "x" # clashes with "el2.x"). mixin_cols = { 'tm': tm, 'dt': TimeDelta([1, 2] * u.day), 'sc': sc, 'scd': scd, 'scdc': scdc, 'scpm': scpm, 'scpmrv': scpmrv, 'scrv': scrv, 'x': [1, 2] * u.m, 'qdb': [10, 20] * u.dB(u.mW), 'qdex': [4.5, 5.5] * u.dex(u.cm/u.s**2), 'qmag': [21, 22] * u.ABmag, 'lat': Latitude([1, 2] * u.deg), 'lon': Longitude([1, 2] * u.deg, wrap_angle=180. * u.deg), 'ang': Angle([1, 2] * u.deg), 'el2': el2, 'sr': sr, 'cr': cr, 'sd': sd, 'srd': srd, } time_attrs = ['value', 'shape', 'format', 'scale', 'location'] compare_attrs = { 'c1': ['data'], 'c2': ['data'], 'tm': time_attrs, 'dt': ['shape', 'value', 'format', 'scale'], 'sc': ['ra', 'dec', 'representation_type', 'frame.name'], 'scd': ['ra', 'dec', 'distance', 'representation_type', 'frame.name'], 'scdc': ['x', 'y', 'z', 'representation_type', 'frame.name'], 'scpm': ['ra', 'dec', 'distance', 'pm_ra_cosdec', 'pm_dec', 'representation_type', 'frame.name'], 'scpmrv': ['ra', 'dec', 'distance', 'pm_ra_cosdec', 'pm_dec', 'radial_velocity', 'representation_type', 'frame.name'], 'scrv': ['ra', 'dec', 'distance', 'radial_velocity', 'representation_type', 'frame.name'], 'x': ['value', 'unit'], 'qdb': ['value', 'unit'], 'qdex': ['value', 'unit'], 'qmag': ['value', 'unit'], 'lon': ['value', 'unit', 'wrap_angle'], 'lat': ['value', 'unit'], 'ang': ['value', 'unit'], 'el2': ['x', 'y', 'z', 'ellipsoid'], 'nd': ['x', 'y', 'z'], 'sr': ['lon', 'lat', 'distance'], 'cr': ['x', 'y', 'z'], 'sd': ['d_lon_coslat', 'd_lat', 'd_distance'], 'srd': ['lon', 'lat', 'distance', 'differentials.s.d_lon_coslat', 'differentials.s.d_lat', 'differentials.s.d_distance'], } @pytest.mark.skipif('not HAS_H5PY') def test_hdf5_mixins_qtable_to_table(tmpdir): """Test writing as QTable and reading as Table. Ensure correct classes come out. """ filename = str(tmpdir.join('test_simple.hdf5')) names = sorted(mixin_cols) t = QTable([mixin_cols[name] for name in names], names=names) t.write(filename, format='hdf5', path='root', serialize_meta=True) t2 = Table.read(filename, format='hdf5', path='root') assert t.colnames == t2.colnames for name, col in t.columns.items(): col2 = t2[name] # Special-case Time, which does not yet support round-tripping # the format. if isinstance(col2, Time): col2.format = col.format attrs = compare_attrs[name] compare_class = True if isinstance(col.info, QuantityInfo): # Downgrade Quantity to Column + unit assert type(col2) is Column # Class-specific attributes like `value` or `wrap_angle` are lost. attrs = ['unit'] compare_class = False # Compare data values here (assert_objects_equal doesn't know how in this case) assert np.all(col.value == col2) assert_objects_equal(col, col2, attrs, compare_class) @pytest.mark.skipif('not HAS_H5PY') @pytest.mark.parametrize('table_cls', (Table, QTable)) def test_hdf5_mixins_as_one(table_cls, tmpdir): """Test write/read all cols at once and validate intermediate column names""" filename = str(tmpdir.join('test_simple.hdf5')) names = sorted(mixin_cols) serialized_names = ['ang', 'cr.x', 'cr.y', 'cr.z', 'dt.jd1', 'dt.jd2', 'el2.x', 'el2.y', 'el2.z', 'lat', 'lon', 'qdb', 'qdex', 'qmag', 'sc.ra', 'sc.dec', 'scd.ra', 'scd.dec', 'scd.distance', 'scd.obstime.jd1', 'scd.obstime.jd2', 'scdc.x', 'scdc.y', 'scdc.z', 'scdc.obstime.jd1', 'scdc.obstime.jd2', 'scpm.ra', 'scpm.dec', 'scpm.distance', 'scpm.pm_ra_cosdec', 'scpm.pm_dec', 'scpmrv.ra', 'scpmrv.dec', 'scpmrv.distance', 'scpmrv.pm_ra_cosdec', 'scpmrv.pm_dec', 'scpmrv.radial_velocity', 'scrv.ra', 'scrv.dec', 'scrv.distance', 'scrv.radial_velocity', 'sd.d_lon_coslat', 'sd.d_lat', 'sd.d_distance', 'sr.lon', 'sr.lat', 'sr.distance', 'srd.lon', 'srd.lat', 'srd.distance', 'srd.differentials.s.d_lon_coslat', 'srd.differentials.s.d_lat', 'srd.differentials.s.d_distance', 'tm.jd1', 'tm.jd2', 'x', ] t = table_cls([mixin_cols[name] for name in names], names=names) t.meta['C'] = 'spam' t.meta['comments'] = ['this', 'is', 'a', 'comment'] t.meta['history'] = ['first', 'second', 'third'] t.write(filename, format="hdf5", path='root', serialize_meta=True) t2 = table_cls.read(filename, format='hdf5', path='root') assert t2.meta['C'] == 'spam' assert t2.meta['comments'] == ['this', 'is', 'a', 'comment'] assert t2.meta['history'] == ['first', 'second', 'third'] assert t.colnames == t2.colnames # Read directly via hdf5 and confirm column names h5 = h5py.File(filename, 'r') assert list(h5['root'].dtype.names) == serialized_names h5.close() @pytest.mark.skipif('not HAS_H5PY') @pytest.mark.parametrize('name_col', list(mixin_cols.items())) @pytest.mark.parametrize('table_cls', (Table, QTable)) def test_hdf5_mixins_per_column(table_cls, name_col, tmpdir): """Test write/read one col at a time and do detailed validation""" filename = str(tmpdir.join('test_simple.hdf5')) name, col = name_col c = [1.0, 2.0] t = table_cls([c, col, c], names=['c1', name, 'c2']) t[name].info.description = 'my description' t[name].info.meta = {'list': list(range(50)), 'dict': {'a': 'b' * 200}} if not t.has_mixin_columns: pytest.skip('column is not a mixin (e.g. Quantity subclass in Table)') if isinstance(t[name], NdarrayMixin): pytest.xfail('NdarrayMixin not supported') t.write(filename, format="hdf5", path='root', serialize_meta=True) t2 = table_cls.read(filename, format='hdf5', path='root') assert t.colnames == t2.colnames for colname in t.colnames: assert_objects_equal(t[colname], t2[colname], compare_attrs[colname]) # Special case to make sure Column type doesn't leak into Time class data if name.startswith('tm'): assert t2[name]._time.jd1.__class__ is np.ndarray assert t2[name]._time.jd2.__class__ is np.ndarray @pytest.mark.skipif('not HAS_H5PY') def test_round_trip_masked_table_default(tmpdir): """Test round-trip of MaskedColumn through HDF5 using default serialization that writes a separate mask column. Note: >>> simple_table(masked=True) a b c int64 float64 str1 ----- ------- ---- -- 1.0 c 2 2.0 -- 3 -- e """ filename = str(tmpdir.join('test.h5')) t = simple_table(masked=True) # int, float, and str cols with one masked element t['c'] = [b'c', b'd', b'e'] t['c'].mask[1] = True t.write(filename, format='hdf5', path='root', serialize_meta=True) t2 = Table.read(filename) assert t2.masked is False assert t2.colnames == t.colnames for name in t2.colnames: assert np.all(t2[name].mask == t[name].mask) assert np.all(t2[name] == t[name]) # Data under the mask round-trips also (unmask data to show this). t[name].mask = False t2[name].mask = False assert np.all(t2[name] == t[name]) @pytest.mark.skipif('not HAS_H5PY') def test_overwrite_serialized_meta(): # This used to cause an error because the meta data table # was not removed from the existing file. with h5py.File('test_data.h5', 'w', driver='core', backing_store=False) as out: t1 = Table() t1.add_column(Column(data=[4, 8, 15], unit='cm')) t1.write(out, path='data', serialize_meta=True) t2 = Table.read(out, path='data') assert all(t1 == t2) assert t1.info(out=None) == t2.info(out=None) t3 = Table() t3.add_column(Column(data=[16, 23, 42], unit='g')) t3.write(out, path='data', serialize_meta=True, append=True, overwrite=True) t2 = Table.read(out, path='data') assert all(t3 == t2) assert t3.info(out=None) == t2.info(out=None)