from __future__ import annotations
import ast
import base64
import hashlib
import logging
import re
from collections.abc import Hashable, Mapping, Sequence
from typing import Any
import numpy as np
import pandas as pd
import pyarrow as pa
import xarray as xr
from xarray import DataArray, Dataset
from .accessors import register_dataset_method
from .aster import extract_all_name_tokens
from .categorical import _Categorical # noqa
from .table import Table
logger = logging.getLogger("sharrow")
well_known_names = {
"nb",
"np",
"pd",
"xr",
"pa",
"log",
"exp",
"log1p",
"expm1",
"max",
"min",
"piece",
"hard_sigmoid",
"transpose_leading",
"clip",
}
def one_based(n):
return pd.RangeIndex(1, n + 1)
def zero_based(n):
return pd.RangeIndex(0, n)
def clean(s):
"""
Convert any string into a similar python identifier.
If any modification of the string is made, or if the string
is longer than 120 characters, it is truncated and a hash of the
original string is added to the end, to ensure every
string maps to a unique cleaned name.
Parameters
----------
s : str
Returns
-------
cleaned : str
"""
if not isinstance(s, str):
s = f"{type(s)}-{s}"
cleaned = re.sub(r"\W|^(?=\d)", "_", s)
if cleaned != s or len(cleaned) > 120:
# digest size 15 creates a 24 character base32 string
h = base64.b32encode(
hashlib.blake2b(s.encode(), digest_size=15).digest()
).decode()
cleaned = f"{cleaned[:90]}_{h}"
return cleaned
[docs]
def construct(source):
"""
Create Datasets from various similar objects.
Parameters
----------
source : pandas.DataFrame, pyarrow.Table, xarray.Dataset, or Sequence[str]
The source from which to create a Dataset. DataFrames and Tables
are converted to Datasets that have one dimension (the rows) and
separate variables for each of the columns. A list of strings
creates a dataset with those named empty variables.
Returns
-------
Dataset
"""
if isinstance(source, pd.DataFrame):
source = dataset_from_dataframe_fast(source) # xarray default can be slow
elif isinstance(source, (Table, pa.Table)):
source = from_table(source)
elif isinstance(source, xr.Dataset):
pass # don't do the superclass things
elif isinstance(source, Sequence) and all(isinstance(i, str) for i in source):
source = from_table(pa.table({i: [] for i in source}))
else:
source = xr.Dataset(source)
return source
def dataset_from_dataframe_fast(
dataframe: pd.DataFrame,
sparse: bool = False,
preserve_cat: bool = True,
) -> Dataset:
"""Convert a pandas.DataFrame into an xarray.Dataset.
Each column will be converted into an independent variable in the
Dataset. If the dataframe's index is a MultiIndex, it will be expanded
into a tensor product of one-dimensional indices (filling in missing
values with NaN). This method will produce a Dataset very similar to
that on which the 'to_dataframe' method was called, except with
possibly redundant dimensions (since all dataset variables will have
the same dimensionality)
Parameters
----------
dataframe : DataFrame
DataFrame from which to copy data and indices.
sparse : bool, default: False
If true, create a sparse arrays instead of dense numpy arrays. This
can potentially save a large amount of memory if the DataFrame has
a MultiIndex. Requires the sparse package (sparse.pydata.org).
preserve_cat : bool, default True
If true, preserve encoding of categorical columns. Xarray lacks an
official implementation of a categorical datatype, so sharrow's
dictionary-based digital encoding is applied instead. Note that in
native xarray usage, the resulting variable will look like integer
values instead of the category values. The `dataset.cat` accessor
can be used to interact with the categorical data.
Returns
-------
New Dataset.
See Also
--------
xarray.DataArray.from_series
pandas.DataFrame.to_xarray
"""
# this is much faster than the default xarray version when not
# using a MultiIndex.
if isinstance(dataframe.index, pd.MultiIndex) or sparse:
return Dataset.from_dataframe(dataframe, sparse)
if not dataframe.columns.is_unique:
# if the dataframe has non-unique column names, but all the duplicate
# names contain the same data, we can recover safely by dropping the
# duplicates, otherwise throw an error.
cannot_fix = False
dupe_columns = dataframe.columns.duplicated()
dupe_column_names = dataframe.columns[dupe_columns]
for j in dupe_column_names:
subframe = dataframe[j]
ref_col = subframe.iloc[:, 0]
for k in range(1, len(subframe.columns)):
if not ref_col.equals(subframe.iloc[:, k]):
cannot_fix = True
break
if cannot_fix:
break
dupe_column_names = [f"- {i}" for i in dupe_column_names]
logger.error(
"DataFrame has non-unique columns\n" + "\n".join(dupe_column_names)
)
if cannot_fix:
raise ValueError("cannot convert DataFrame with non-unique columns")
else:
dataframe = dataframe.loc[:, ~dupe_columns]
if isinstance(dataframe.index, pd.CategoricalIndex):
idx = dataframe.index.remove_unused_categories()
else:
idx = dataframe.index
index_name = idx.name if idx.name is not None else "index"
# Cast to a NumPy array first, in case the Series is a pandas Extension
# array (which doesn't have a valid NumPy dtype)
arrays = {}
for name in dataframe.columns:
if name != index_name:
if dataframe[name].dtype == "category" and preserve_cat:
cat = dataframe[name].cat
categories = np.asarray(cat.categories)
if categories.dtype.kind == "O":
categories = categories.astype(str)
arrays[name] = (
[index_name],
np.asarray(cat.codes),
{
"digital_encoding": {
"dictionary": categories,
"ordered": cat.ordered,
}
},
)
else:
arrays[name] = ([index_name], np.asarray(dataframe[name].values))
return Dataset(arrays, coords={index_name: (index_name, dataframe.index.values)})
[docs]
def from_table(
tbl,
index_name="index",
index=None,
):
"""
Convert a pyarrow.Table into an xarray.Dataset.
Parameters
----------
tbl : Table
Table from which to use data and indices.
index_name : str, default 'index'
This name will be given to the default dimension index, if
none is given. Ignored if `index` is given explicitly and
it already has a name.
index : Index-like, optional
Use this index instead of a default RangeIndex.
Returns
-------
New Dataset.
"""
if len(set(tbl.column_names)) != len(tbl.column_names):
raise ValueError("cannot convert Table with non-unique columns")
if index is None:
index = pd.RangeIndex(len(tbl), name=index_name)
else:
if len(index) != len(tbl):
raise ValueError(
f"length of index ({len(index)}) does not match length of table ({len(tbl)})"
)
if isinstance(index, pd.MultiIndex) and not index.is_unique:
raise ValueError(
"cannot attach a non-unique MultiIndex and convert into xarray"
)
arrays = []
metadata = {}
for n in range(len(tbl.column_names)):
c = tbl.column(n)
if isinstance(c.type, pa.DictionaryType):
cc = c.combine_chunks()
arrays.append((tbl.column_names[n], np.asarray(cc.indices)))
metadata[tbl.column_names[n]] = {
"digital_encoding": {
"dictionary": cc.dictionary,
"ordered": cc.type.ordered,
}
}
else:
arrays.append((tbl.column_names[n], np.asarray(c)))
result = xr.Dataset()
if isinstance(index, pd.MultiIndex):
dims = tuple(
name if name is not None else "level_%i" % n
for n, name in enumerate(index.names)
)
for dim, lev in zip(dims, index.levels):
result[dim] = (dim, lev)
else:
try:
if index.name is not None:
index_name = index.name
except AttributeError:
pass
dims = (index_name,)
result[index_name] = (dims, index)
result._set_numpy_data_from_dataframe(index, arrays, dims)
for k, v in metadata.items():
result[k].attrs.update(v)
return result
[docs]
def from_omx(
omx,
index_names=("otaz", "dtaz"),
indexes="one-based",
renames=None,
):
"""
Create a Dataset from an OMX file.
Parameters
----------
omx : openmatrix.File or larch.OMX
An OMX-format file, opened for reading.
index_names : tuple, default ("otaz", "dtaz")
Should be a tuple of length 3, giving the names of the three
dimensions. The first two names are the native dimensions from
the open matrix file, the last is the name of the implicit
dimension that is created by parsing array names.
indexes : str or tuple[str], optional
The name of a 'lookup' in the OMX file, which will be used to
populate the coordinates for the two native dimensions. Or,
specify "one-based" or "zero-based" to assume sequential and
consecutive numbering starting with 1 or 0 respectively. For
non-square OMX data, this must be given as a tuple, relating
indexes as above for each dimension of `index_names`.
renames : Mapping or Collection, optional
Limit the import only to these data elements. If given as a
mapping, the keys will be the names of variables in the resulting
dataset, and the values give the names of data matrix tables in the
OMX file. If given as a list or other non-mapping collection,
elements are not renamed but only elements in the collection are
included.
Returns
-------
Dataset
"""
# handle both larch.OMX and openmatrix.open_file versions
if "lar" in type(omx).__module__:
omx_data = omx.data
omx_lookup = omx.lookup
omx_shape = omx.shape
else:
omx_data = omx.root["data"]
omx_lookup = omx.root["lookup"]
omx_shape = omx.shape()
arrays = {}
if renames is None:
for k in omx_data._v_children:
arrays[k] = omx_data[k][:]
elif isinstance(renames, dict):
for new_k, old_k in renames.items():
arrays[new_k] = omx_data[old_k][:]
else:
for k in renames:
arrays[k] = omx_data[k][:]
d = {
"dims": index_names,
"data_vars": {k: {"dims": index_names, "data": arrays[k]} for k in arrays},
}
if indexes == "one-based":
indexes = {
index_names[0]: one_based(omx_shape[0]),
index_names[1]: one_based(omx_shape[1]),
}
elif indexes == "zero-based":
indexes = {
index_names[0]: zero_based(omx_shape[0]),
index_names[1]: zero_based(omx_shape[1]),
}
elif isinstance(indexes, str):
if indexes in omx_lookup:
if omx_shape[0] != omx_shape[1]:
raise ValueError("singleton arbitrary coordinates on non-square arrays")
ixs = np.asarray(omx_lookup[indexes])
indexes = {
index_names[0]: ixs,
index_names[1]: ixs,
}
else:
raise KeyError(f"{indexes} not found in OMX lookups")
elif isinstance(indexes, tuple):
indexes_ = {}
for n, (name, i) in enumerate(zip(index_names, indexes)):
if i == "one-based":
indexes_[name] = one_based(omx_shape[n])
elif i == "zero-based":
indexes_[name] = zero_based(omx_shape[n])
elif isinstance(i, str):
if i in omx_lookup:
indexes_[name] = np.asarray(omx_lookup[i])
else:
raise KeyError(f"{i} not found in OMX lookups")
indexes = indexes_
if indexes is not None:
d["coords"] = {
index_name: {"dims": index_name, "data": index}
for index_name, index in indexes.items()
}
return xr.Dataset.from_dict(d)
[docs]
def from_omx_3d(
omx,
index_names=("otaz", "dtaz", "time_period"),
indexes=None,
*,
time_periods=None,
time_period_sep="__",
max_float_precision=32,
):
"""
Create a Dataset from an OMX file with an implicit third dimension.
Parameters
----------
omx : openmatrix.File or larch.OMX
An OMX-format file, opened for reading.
index_names : tuple, default ("otaz", "dtaz", "time_period")
Should be a tuple of length 3, giving the names of the three
dimensions. The first two names are the native dimensions from
the open matrix file, the last is the name of the implicit
dimension that is created by parsing array names.
indexes : str, optional
The name of a 'lookup' in the OMX file, which will be used to
populate the coordinates for the two native dimensions. Or,
specify "one-based" or "zero-based" to assume sequential and
consecutive numbering starting with 1 or 0 respectively.
time_periods : list-like, required keyword argument
A list of index values from which the third dimension is constructed
for all variables with a third dimension.
time_period_sep : str, default "__" (double underscore)
The presence of this separator within the name of any table in the
OMX file indicates that table is to be considered a page in a
three dimensional variable. The portion of the name preceding the
first instance of this separator is the name of the resulting
variable, and the portion of the name after the first instance of
this separator is the label of the position for this page, which
should appear in `time_periods`.
max_float_precision : int, default 32
When loading, reduce all floats in the OMX file to this level of
precision, generally to save memory if they were stored as double
precision but that level of detail is unneeded in the present
application.
Returns
-------
Dataset
"""
if not isinstance(omx, (list, tuple)):
omx = [omx]
# handle both larch.OMX and openmatrix.open_file versions
if "larch" in type(omx[0]).__module__:
omx_shape = omx[0].shape
omx_lookup = omx[0].lookup
else:
omx_shape = omx[0].shape()
omx_lookup = omx[0].root["lookup"]
omx_data = []
omx_data_map = {}
for n, i in enumerate(omx):
if "larch" in type(i).__module__:
omx_data.append(i.data)
for k in i.data._v_children:
omx_data_map[k] = n
else:
omx_data.append(i.root["data"])
for k in i.root["data"]._v_children:
omx_data_map[k] = n
import dask.array
data_names = list(omx_data_map.keys())
n1, n2 = omx_shape
if indexes is None:
# default reads mapping if only one lookup is included, otherwise one-based
if len(omx_lookup._v_children) == 1:
ranger = None
indexes = list(omx_lookup._v_children)[0]
else:
ranger = one_based
elif indexes == "one-based":
ranger = one_based
elif indexes == "zero-based":
ranger = zero_based
elif indexes in set(omx_lookup._v_children):
ranger = None
else:
raise NotImplementedError(
"only one-based, zero-based, and named indexes are implemented"
)
if ranger is not None:
r1 = ranger(n1)
r2 = ranger(n2)
else:
r1 = r2 = pd.Index(omx_lookup[indexes])
if time_periods is None:
raise ValueError("must give time periods explicitly")
time_periods_map = {t: n for n, t in enumerate(time_periods)}
pending_3d = {}
content = {}
for k in data_names:
if time_period_sep in k:
base_k, time_k = k.split(time_period_sep, 1)
if base_k not in pending_3d:
pending_3d[base_k] = [None] * len(time_periods)
pending_3d[base_k][time_periods_map[time_k]] = dask.array.from_array(
omx_data[omx_data_map[k]][k]
)
else:
content[k] = xr.DataArray(
dask.array.from_array(omx_data[omx_data_map[k]][k]),
dims=index_names[:2],
coords={
index_names[0]: r1,
index_names[1]: r2,
},
)
for base_k, darrs in pending_3d.items():
# find a prototype array
prototype = None
for i in darrs:
prototype = i
if prototype is not None:
break
if prototype is None:
raise ValueError("no prototype")
darrs_ = [
(i if i is not None else dask.array.zeros_like(prototype)) for i in darrs
]
content[base_k] = xr.DataArray(
dask.array.stack(darrs_, axis=-1),
dims=index_names,
coords={
index_names[0]: r1,
index_names[1]: r2,
index_names[2]: time_periods,
},
)
for i in content:
if np.issubdtype(content[i].dtype, np.floating):
if content[i].dtype.itemsize > max_float_precision / 8:
content[i] = content[i].astype(f"float{max_float_precision}")
return xr.Dataset(content)
def from_amx(
amx,
index_names=("otaz", "dtaz"),
indexes="one-based",
renames=None,
):
arrays = {}
if renames is None:
for k in amx.list_matrices():
arrays[k] = amx[k][:]
elif isinstance(renames, dict):
for new_k, old_k in renames.items():
arrays[new_k] = amx[old_k]
else:
for k in renames:
arrays[k] = amx[k]
d = {
"dims": index_names,
"data_vars": {k: {"dims": index_names, "data": arrays[k]} for k in arrays},
}
if indexes == "one-based":
indexes = {index_names[i]: "1" for i in range(len(index_names))}
elif indexes == "zero-based":
indexes = {index_names[i]: "0" for i in range(len(index_names))}
if isinstance(indexes, (list, tuple)):
indexes = dict(zip(index_names, indexes))
if isinstance(indexes, dict):
for n, i in enumerate(index_names):
if indexes.get(i) == "1":
indexes[i] = one_based(amx.shape[n])
elif indexes.get(i) == "0":
indexes[i] = zero_based(amx.shape[n])
if indexes is not None:
d["coords"] = {
index_name: {"dims": index_name, "data": index}
for index_name, index in indexes.items()
}
return xr.Dataset.from_dict(d)
[docs]
def from_zarr(store, *args, **kwargs):
"""
Load and decode a dataset from a Zarr store.
The `store` object should be a valid store for a Zarr group. `store`
variables must contain dimension metadata encoded in the
`_ARRAY_DIMENSIONS` attribute.
Parameters
----------
store : MutableMapping or str
A MutableMapping where a Zarr Group has been stored or a path to a
directory in file system where a Zarr DirectoryStore has been stored.
synchronizer : object, optional
Array synchronizer provided to zarr
group : str, optional
Group path. (a.k.a. `path` in zarr terminology.)
chunks : int or dict or tuple or {None, 'auto'}, optional
Chunk sizes along each dimension, e.g., ``5`` or
``{'x': 5, 'y': 5}``. If `chunks='auto'`, dask chunks are created
based on the variable's zarr chunks. If `chunks=None`, zarr array
data will lazily convert to numpy arrays upon access. This accepts
all the chunk specifications as Dask does.
overwrite_encoded_chunks : bool, optional
Whether to drop the zarr chunks encoded for each variable when a
dataset is loaded with specified chunk sizes (default: False)
decode_cf : bool, optional
Whether to decode these variables, assuming they were saved according
to CF conventions.
mask_and_scale : bool, optional
If True, replace array values equal to `_FillValue` with NA and scale
values according to the formula `original_values * scale_factor +
add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are
taken from variable attributes (if they exist). If the `_FillValue` or
`missing_value` attribute contains multiple values a warning will be
issued and all array values matching one of the multiple values will
be replaced by NA.
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
removed) if they have no corresponding variable and if they are only
used as the last dimension of character arrays.
decode_coords : bool, optional
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
drop_variables : str or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
consolidated : bool, optional
Whether to open the store using zarr's consolidated metadata
capability. Only works for stores that have already been consolidated.
By default (`consolidate=None`), attempts to read consolidated metadata,
falling back to read non-consolidated metadata if that fails.
chunk_store : MutableMapping, optional
A separate Zarr store only for chunk data.
storage_options : dict, optional
Any additional parameters for the storage backend (ignored for local
paths).
decode_timedelta : bool, optional
If True, decode variables and coordinates with time units in
{'days', 'hours', 'minutes', 'seconds', 'milliseconds', 'microseconds'}
into timedelta objects. If False, leave them encoded as numbers.
If None (default), assume the same value of decode_time.
use_cftime : bool, optional
Only relevant if encoded dates come from a standard calendar
(e.g. "gregorian", "proleptic_gregorian", "standard", or not
specified). If None (default), attempt to decode times to
``np.datetime64[ns]`` objects; if this is not possible, decode times to
``cftime.datetime`` objects. If True, always decode times to
``cftime.datetime`` objects, regardless of whether or not they can be
represented using ``np.datetime64[ns]`` objects. If False, always
decode times to ``np.datetime64[ns]`` objects; if this is not possible
raise an error.
Returns
-------
dataset : Dataset
The newly created dataset.
References
----------
http://zarr.readthedocs.io/
"""
return xr.open_zarr(store, *args, **kwargs)
def from_zarr_with_attr(*args, **kwargs):
obj = from_zarr(*args, **kwargs)
for k in obj:
attrs = {}
for aname, avalue in obj[k].attrs.items():
attrs[aname] = _from_evalable_string(avalue)
obj[k] = obj[k].assign_attrs(attrs)
attrs = {}
for aname, avalue in obj.attrs.items():
attrs[aname] = _from_evalable_string(avalue)
obj = obj.assign_attrs(attrs)
return obj
def coerce_to_range_index(idx):
if isinstance(idx, pd.RangeIndex):
return idx
if isinstance(idx, (pd.Int64Index, pd.Float64Index, pd.UInt64Index)):
if idx.is_monotonic_increasing and idx[-1] - idx[0] == idx.size - 1:
return pd.RangeIndex(idx[0], idx[0] + idx.size)
return idx
def is_dict_like(value: Any) -> bool:
return hasattr(value, "keys") and hasattr(value, "__getitem__")
@xr.register_dataset_accessor("single_dim")
class _SingleDim:
"""Convenience accessor for single-dimension datasets."""
__slots__ = ("dataset", "dim_name")
def __init__(self, dataset: Dataset):
self.dataset = dataset
if len(self.dataset.dims) != 1:
raise ValueError("single_dim implies a single dimension dataset")
self.dim_name = self.dataset.dims.__iter__().__next__()
@property
def coords(self):
return self.dataset.coords[self.dim_name]
@property
def index(self):
return self.dataset.indexes[self.dim_name]
@property
def size(self):
return self.dataset.dims[self.dim_name]
def _to_pydict(self):
columns = [k for k in self.dataset.variables if k != self.dim_name]
data = []
for k in columns:
a = self.dataset._variables[k]
if (
"digital_encoding" in a.attrs
and "dictionary" in a.attrs["digital_encoding"]
):
de = a.attrs["digital_encoding"]
data.append(
pd.Categorical.from_codes(
a.values,
de["dictionary"],
de.get("ordered"),
)
)
else:
data.append(a.values)
return dict(zip(columns, data))
def to_pyarrow(self) -> pa.Table:
columns = [k for k in self.dataset.variables if k != self.dim_name]
data = []
for k in columns:
a = self.dataset._variables[k]
if (
"digital_encoding" in a.attrs
and "dictionary" in a.attrs["digital_encoding"]
):
de = a.attrs["digital_encoding"]
data.append(
pa.DictionaryArray.from_arrays(
a.values,
de["dictionary"],
ordered=de.get("ordered", False),
)
)
else:
data.append(pa.array(a.values))
content = dict(zip(columns, data))
content[self.dim_name] = self.index
return pa.Table.from_pydict(content)
def to_parquet(self, filename):
import pyarrow.parquet as pq
t = self.to_pyarrow()
pq.write_table(t, filename)
def to_pandas(self) -> pd.DataFrame:
"""
Convert this dataset into a pandas DataFrame.
The resulting DataFrame is always a copy of the data in the dataset.
Returns
-------
pandas.DataFrame
"""
return pd.DataFrame(self._to_pydict(), index=self.index, copy=True)
def eval(
self,
expr: str,
parser: str = "pandas",
engine: str | None = None,
local_dict=None,
global_dict=None,
):
"""
Evaluate a Python expression as a string using various backends.
Parameters
----------
expr : str
The expression to evaluate. This string cannot contain any Python
`statements
<https://docs.python.org/3/reference/simple_stmts.html#simple-statements>`__,
only Python `expressions
<https://docs.python.org/3/reference/simple_stmts.html#expression-statements>`__.
parser : {'pandas', 'python'}, default 'pandas'
The parser to use to construct the syntax tree from the expression. The
default of ``'pandas'`` parses code slightly different than standard
Python. Alternatively, you can parse an expression using the
``'python'`` parser to retain strict Python semantics. See the
:ref:`enhancing performance <enhancingperf.eval>` documentation for
more details.
engine : {'python', 'numexpr'}, default 'numexpr'
The engine used to evaluate the expression. Supported engines are
- None : tries to use ``numexpr``, falls back to ``python``
- ``'numexpr'`` : This default engine evaluates pandas objects using
numexpr for large speed ups in complex expressions with large frames.
- ``'python'`` : Performs operations as if you had ``eval``'d in top
level python. This engine is generally not that useful.
local_dict : dict or None, optional
A dictionary of local variables, taken from locals() by default.
global_dict : dict or None, optional
A dictionary of global variables, taken from globals() by default.
Returns
-------
DataArray or numeric scalar
"""
result = pd.eval(
expr,
parser=parser,
engine=engine,
local_dict=local_dict,
global_dict=global_dict,
resolvers=[self.dataset],
)
if result.size == self.size:
return DataArray(np.asarray(result), coords=self.dataset.coords)
else:
return result
@xr.register_dataarray_accessor("single_dim")
class _SingleDimArray:
"""Convenience accessor for single-dimension datasets."""
__slots__ = ("dataarray", "dim_name")
def __init__(self, dataarray: DataArray):
self.dataarray = dataarray
if len(self.dataarray.dims) != 1:
raise ValueError("single_dim implies a single dimension dataset")
self.dim_name = self.dataarray.dims[0]
@property
def coords(self):
return self.dataarray.coords[self.dim_name]
@property
def index(self):
return self.dataarray.indexes[self.dim_name]
def rename(self, name: str) -> DataArray:
"""Rename the single dimension."""
if self.dim_name == name:
return self.dataarray
return self.dataarray.rename({self.dim_name: name})
def to_pandas(self) -> pd.Series:
"""
Convert this array into a pandas Series.
If this array is categorical (i.e. with a simple dictionary-based
digital encoding) then the result will be a Series with categorical dtype.
The DataArray's `name` attribute is preserved in the result.
"""
if self.dataarray.cat.is_categorical():
return pd.Series(
pd.Categorical.from_codes(
self.dataarray,
self.dataarray.cat.categories,
self.dataarray.cat.ordered,
),
index=self.index,
name=self.dataarray.name,
)
else:
result = self.dataarray.to_pandas()
if self.dataarray.name:
result = result.rename(self.dataarray.name)
return result
def to_pyarrow(self):
if self.dataarray.cat.is_categorical():
return pa.DictionaryArray.from_arrays(
self.dataarray.data, self.dataarray.cat.categories
)
else:
return pa.array(self.dataarray.data)
@xr.register_dataset_accessor("iloc")
class _iLocIndexer:
"""
Purely integer-location based indexing for selection by position on 1-d Datasets.
In many ways, a dataset with a single dimensions is like a pandas DataFrame,
with the one dimension giving the rows, and the variables as columns. This
analogy eventually breaks down (DataFrame columns are ordered, Dataset
variables are not) but the similarities are enough that it’s sometimes
convenient to have iloc functionality enabled. This only works for indexing
on the rows, but if there’s only the one dimension the complexity of isel
is not needed.
"""
__slots__ = ("dataset",)
def __init__(self, dataset: Dataset):
self.dataset = dataset
def __getitem__(self, key: Mapping[Hashable, Any]) -> Dataset:
if not is_dict_like(key):
if len(self.dataset.dims) == 1:
dim_name = self.dataset.dims.__iter__().__next__()
key = {dim_name: key}
else:
raise TypeError(
"can only lookup dictionaries from Dataset.iloc, "
"unless there is only one dimension"
)
return self.dataset.isel(key)
xr.Dataset.rename_dims_and_coords = xr.Dataset.rename
@register_dataset_method
def rename_or_ignore(self, dims_dict=None, **dims_kwargs):
from xarray.core.utils import either_dict_or_kwargs
dims_dict = either_dict_or_kwargs(dims_dict, dims_kwargs, "rename_dims_and_coords")
dims_dict = {
k: v for (k, v) in dims_dict.items() if (k in self.dims or k in self._variables)
}
return self.rename(dims_dict)
@register_dataset_method
def to_zarr_zip(self, *args, **kwargs):
"""
Write dataset contents to a zarr group.
Parameters
----------
store : MutableMapping, str or Path, optional
Store or path to directory in file system. If given with a
".zarr.zip" extension, and keyword arguments limited to 'mode' and
'compression', then a ZipStore will be created, populated, and then
immediately closed.
chunk_store : MutableMapping, str or Path, optional
Store or path to directory in file system only for Zarr array chunks.
Requires zarr-python v2.4.0 or later.
mode : {"w", "w-", "a", None}, optional
Persistence mode: "w" means create (overwrite if exists);
"w-" means create (fail if exists);
"a" means override existing variables (create if does not exist).
If ``append_dim`` is set, ``mode`` can be omitted as it is
internally set to ``"a"``. Otherwise, ``mode`` will default to
`w-` if not set.
synchronizer : object, optional
Zarr array synchronizer.
group : str, optional
Group path. (a.k.a. `path` in zarr terminology.)
encoding : dict, optional
Nested dictionary with variable names as keys and dictionaries of
variable specific encodings as values, e.g.,
``{"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}``
compute : bool, optional
If True write array data immediately, otherwise return a
``dask.delayed.Delayed`` object that can be computed to write
array data later. Metadata is always updated eagerly.
consolidated : bool, optional
If True, apply zarr's `consolidate_metadata` function to the store
after writing metadata.
append_dim : hashable, optional
If set, the dimension along which the data will be appended. All
other dimensions on overriden variables must remain the same size.
region : dict, optional
Optional mapping from dimension names to integer slices along
dataset dimensions to indicate the region of existing zarr array(s)
in which to write this dataset's data. For example,
``{'x': slice(0, 1000), 'y': slice(10000, 11000)}`` would indicate
that values should be written to the region ``0:1000`` along ``x``
and ``10000:11000`` along ``y``.
Two restrictions apply to the use of ``region``:
- If ``region`` is set, _all_ variables in a dataset must have at
least one dimension in common with the region. Other variables
should be written in a separate call to ``to_zarr()``.
- Dimensions cannot be included in both ``region`` and
``append_dim`` at the same time. To create empty arrays to fill
in with ``region``, use a separate call to ``to_zarr()`` with
``compute=False``. See "Appending to existing Zarr stores" in
the reference documentation for full details.
compression : int, optional
Only used for ".zarr.zip" files. By default zarr uses blosc
compression for chunks, so adding another layer of compression here
is typically redundant.
References
----------
https://zarr.readthedocs.io/
Notes
-----
Zarr chunking behavior:
If chunks are found in the encoding argument or attribute
corresponding to any DataArray, those chunks are used.
If a DataArray is a dask array, it is written with those chunks.
If not other chunks are found, Zarr uses its own heuristics to
choose automatic chunk sizes.
"""
if len(args) == 1 and isinstance(args[0], str) and args[0].endswith(".zarr.zip"):
if {"compression", "mode"}.issuperset(kwargs.keys()):
import zarr
with zarr.ZipStore(args[0], **kwargs) as store:
self.to_zarr(store)
return
return super().to_zarr(*args, **kwargs)
def _to_ast_literal(x):
if isinstance(x, dict):
return (
"{"
+ ", ".join(
f"{_to_ast_literal(k)}: {_to_ast_literal(v)}" for k, v in x.items()
)
+ "}"
)
elif isinstance(x, list):
return "[" + ", ".join(_to_ast_literal(i) for i in x) + "]"
elif isinstance(x, tuple):
return "(" + ", ".join(_to_ast_literal(i) for i in x) + ")"
elif isinstance(x, pd.Index):
return _to_ast_literal(x.to_list())
elif isinstance(x, np.ndarray):
return _to_ast_literal(list(x))
else:
return repr(x)
def _to_evalable_string(x):
if x is None:
return " < None > "
elif x is True:
return " < True > "
elif x is False:
return " < False > "
else:
return f" {_to_ast_literal(x)} "
def _from_evalable_string(x):
if isinstance(x, str):
# if x.startswith(" {") and x.endswith("} "):
# return ast.literal_eval(x[1:-1])
if x == " < None > ":
return None
if x == " < True > ":
return True
if x == " < False > ":
return False
if x.startswith(" ") and x.endswith(" "):
try:
return ast.literal_eval(x.strip(" "))
except Exception:
print(x)
raise
else:
return x
@register_dataset_method
def to_zarr_with_attr(self, *args, **kwargs):
"""
Write dataset contents to a zarr group.
Parameters
----------
store : MutableMapping, str or Path, optional
Store or path to directory in file system. If given with a
".zarr.zip" extension, and keyword arguments limited to 'mode' and
'compression', then a ZipStore will be created, populated, and then
immediately closed.
chunk_store : MutableMapping, str or Path, optional
Store or path to directory in file system only for Zarr array chunks.
Requires zarr-python v2.4.0 or later.
mode : {"w", "w-", "a", None}, optional
Persistence mode: "w" means create (overwrite if exists);
"w-" means create (fail if exists);
"a" means override existing variables (create if does not exist).
If ``append_dim`` is set, ``mode`` can be omitted as it is
internally set to ``"a"``. Otherwise, ``mode`` will default to
`w-` if not set.
synchronizer : object, optional
Zarr array synchronizer.
group : str, optional
Group path. (a.k.a. `path` in zarr terminology.)
encoding : dict, optional
Nested dictionary with variable names as keys and dictionaries of
variable specific encodings as values, e.g.,
``{"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}``
compute : bool, optional
If True write array data immediately, otherwise return a
``dask.delayed.Delayed`` object that can be computed to write
array data later. Metadata is always updated eagerly.
consolidated : bool, optional
If True, apply zarr's `consolidate_metadata` function to the store
after writing metadata.
append_dim : hashable, optional
If set, the dimension along which the data will be appended. All
other dimensions on overriden variables must remain the same size.
region : dict, optional
Optional mapping from dimension names to integer slices along
dataset dimensions to indicate the region of existing zarr array(s)
in which to write this dataset's data. For example,
``{'x': slice(0, 1000), 'y': slice(10000, 11000)}`` would indicate
that values should be written to the region ``0:1000`` along ``x``
and ``10000:11000`` along ``y``.
Two restrictions apply to the use of ``region``:
- If ``region`` is set, _all_ variables in a dataset must have at
least one dimension in common with the region. Other variables
should be written in a separate call to ``to_zarr()``.
- Dimensions cannot be included in both ``region`` and
``append_dim`` at the same time. To create empty arrays to fill
in with ``region``, use a separate call to ``to_zarr()`` with
``compute=False``. See "Appending to existing Zarr stores" in
the reference documentation for full details.
compression : int, optional
Only used for ".zarr.zip" files. By default zarr uses blosc
compression for chunks, so adding another layer of compression here
is typically redundant.
References
----------
https://zarr.readthedocs.io/
Notes
-----
Zarr chunking behavior:
If chunks are found in the encoding argument or attribute
corresponding to any DataArray, those chunks are used.
If a DataArray is a dask array, it is written with those chunks.
If not other chunks are found, Zarr uses its own heuristics to
choose automatic chunk sizes.
"""
obj = self.copy()
for k in self:
attrs = {}
for aname, avalue in self[k].attrs.items():
attrs[aname] = _to_evalable_string(avalue)
obj[k] = self[k].assign_attrs(attrs)
if hasattr(self, "coords"):
for k in self.coords:
attrs = {}
for aname, avalue in self.coords[k].attrs.items():
attrs[aname] = _to_evalable_string(avalue)
obj.coords[k] = self.coords[k].assign_attrs(attrs)
attrs = {}
for aname, avalue in self.attrs.items():
attrs[aname] = _to_evalable_string(avalue)
obj = obj.assign_attrs(attrs)
return obj.to_zarr(*args, **kwargs)
@register_dataset_method
def to_table(self):
"""
Convert dataset contents to a pyarrow Table.
This dataset must not contain more than one dimension.
"""
assert isinstance(self, Dataset)
if len(self.dims) != 1:
raise ValueError("Only 1-dim datasets can be converted to tables")
import pyarrow as pa
from .relationships import sparse_array_type
def to_numpy(var):
"""Coerces wrapped data to numpy and returns a numpy.ndarray."""
data = var.data
if hasattr(data, "chunks"):
data = data.compute()
if isinstance(data, sparse_array_type):
data = data.todense()
return np.asarray(data)
pydict = {}
for i in self.variables:
dictionary = self[i].attrs.get("DICTIONARY", None)
if dictionary is not None:
pydict[i] = pa.DictionaryArray.from_arrays(
to_numpy(self[i]),
dictionary,
)
else:
pydict[i] = pa.array(to_numpy(self[i]))
return pa.Table.from_pydict(pydict)
@register_dataset_method
def select_and_rename(self, name_dict=None, **names):
"""
Select and rename variables from this Dataset.
Parameters
----------
name_dict, **names: dict
The keys or keyword arguments give the current names of the
variables that will be selected out of this Dataset. The values
give the new names of the same variables in the resulting Dataset.
Returns
-------
Dataset
"""
if name_dict is None:
name_dict = names
else:
name_dict.update(names)
return self[list(name_dict.keys())].rename(name_dict)
@register_dataset_method
def max_float_precision(self, p=32):
"""
Set the maximum precision for floating point values.
This modifies the Dataset in-place.
Parameters
----------
p : {64, 32, 16}
The max precision to set.
Returns
-------
self
"""
for i in self:
if np.issubdtype(self[i].dtype, np.floating):
if self[i].dtype.itemsize > p / 8:
self[i] = self[i].astype(f"float{p}")
return self
@register_dataset_method
def interchange_dims(self, dim1, dim2):
"""
Rename a pair of dimensions by swapping their names.
Parameters
----------
dim1, dim2 : str
The names of the two dimensions to swap.
Returns
-------
Dataset
"""
p21 = "PLACEHOLD21"
p12 = "PLACEHOLD12"
s1 = {dim1: p12, dim2: p21}
s2 = {p12: dim2, p21: dim1}
rv = {}
vr = {}
if dim1 in self.variables:
rv[dim1] = p12
vr[p12] = dim2
if dim2 in self.variables:
rv[dim2] = p21
vr[p21] = dim1
return self.rename_dims(s1).rename_vars(rv).rename_dims(s2).rename_vars(vr)
[docs]
def from_named_objects(*args):
"""
Create a Dataset by populating it with named objects.
A mapping of names to values is first created, and then that mapping is
used in the standard constructor to initialize a Dataset.
Parameters
----------
*args : Any
A collection of objects, each exposing a `name` attribute.
Returns
-------
Dataset
"""
objs = {}
for n, a in enumerate(args):
try:
name = a.name
except AttributeError:
raise ValueError(f"argument {n} has no name") from None
if name is None:
raise ValueError(f"the name for argument {n} is None")
objs[name] = a
return xr.Dataset(objs)
@register_dataset_method
def ensure_integer(dataset, names, bitwidth=32, inplace=False):
"""
Convert dataset variables to integers, if they are not already integers.
Parameters
----------
names : Iterable[str]
Variable names in this dataset to convert.
bitwidth : int, default 32
Bit width of integers that are created when a conversion is made.
Note that variables that are already integer are not modified,
even if their bit width differs from this.
inplace : bool, default False
Whether to make the conversion in-place on this Dataset, or
return a copy.
Returns
-------
Dataset
"""
if inplace:
result = dataset
else:
result = dataset.copy()
for name in names:
if name not in result:
continue
if not np.issubdtype(result[name].dtype, np.integer):
result[name] = result[name].astype(f"int{bitwidth}")
if not inplace:
return result
def filter_name_tokens(expr, matchable_names=None):
name_tokens = extract_all_name_tokens(expr)
name_tokens -= {"_args", "_inputs", "_outputs", "np"}
name_tokens -= well_known_names
if matchable_names:
name_tokens &= matchable_names
return name_tokens
def _dyno(k, v):
if isinstance(v, str) and v[0] == "@":
return f"__dynamic_{k}{v}"
elif v is None:
return f"__dynamic_{k}"
else:
return v
def _flip_flop_def(v):
if "# sharrow:" in v:
return v.split("# sharrow:", 1)[1].strip()
else:
return v