Skip to content

Commit 629368b

Browse files
authored
Merge pull request #254 from sadielbartholomew/lama-to-dask-6-new
`Data.equals`: add unit test & migrate to Dask
2 parents 8c9d14f + 07d937d commit 629368b

File tree

17 files changed

+870
-92
lines changed

17 files changed

+870
-92
lines changed

.github/workflows/dask-migration-testing.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,20 @@ jobs:
100100
- name: Notify about starting testing
101101
run: echo Setup complete. Now starting to run the cf-python test suite...
102102

103-
# Finally run test_Data.py!
103+
# Finally run the relevant tests: firstly test_Data.py...
104104
- name: Run the test_Data test module
105105
shell: bash -l {0}
106106
run: |
107107
cd ${{ github.workspace }}/main/cf/test
108108
python test_Data.py
109109
110+
# ... and finally test_Data_utils.py.
111+
- name: Run the test_Data test module
112+
shell: bash -l {0}
113+
run: |
114+
cd ${{ github.workspace }}/main/cf/test
115+
python test_Data_utils.py
116+
110117
# End with a message indicating the suite has completed its run
111118
- name: Notify about a completed run
112119
run: |

cf/cellmethod.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ def create(cls, cell_methods_string=None):
104104
#
105105
# ['lat:', 'mean', '(', 'interval:', '1', 'hour', ')']
106106
# ------------------------------------------------------------
107-
cell_methods = re.sub("\((?=[^\s])", "( ", cell_methods_string)
108-
cell_methods = re.sub("(?<=[^\s])\)", " )", cell_methods).split()
107+
cell_methods = re.sub(r"\((?=[^\s])", "( ", cell_methods_string)
108+
cell_methods = re.sub(r"(?<=[^\s])\)", " )", cell_methods).split()
109109

110110
while cell_methods:
111111
cm = cls()
@@ -156,7 +156,7 @@ def create(cls, cell_methods_string=None):
156156
if not (re.search("^(interval|comment):$", cell_methods[0])):
157157
cell_methods.insert(0, "comment:")
158158

159-
while not re.search("^\)$", cell_methods[0]):
159+
while not re.search(r"^\)$", cell_methods[0]):
160160
term = cell_methods.pop(0)[:-1]
161161

162162
if term == "interval":

cf/data/README.rst

Lines changed: 173 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,78 @@
11
`cf.Data` developer notes
22
=========================
33

4+
Masked arrays
5+
-------------
6+
7+
Whether there is a mask or not
8+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
9+
10+
For methods such as `equals`, we need to consider whether an array is
11+
a masked one, and if so, we need to consider the *masks* (e.g. whether they
12+
are equal), as well as the *data* (equality or otherwise).
13+
14+
But the difficulty is that some level of inspection, i.e. computation, is
15+
required to know whether the object in question is masked or not! (This is
16+
due to, fundamentally, the underlying netCDF or PP representation.)
17+
And we want to avoid early computation, as again it is inefficient.
18+
19+
Consider, for example, the case of a set of computations in which an
20+
array may acquire a mask, or may not: until the `compute` is run,
21+
we don't know whether there is a mask at the end. Note there is a
22+
distinction here between a standard `array` and a `masked` array
23+
which may have a trivial (say, all `False`) or non-trivial mask, e.g.
24+
for Dask array cases (similarly for `np.ma` etc.):
25+
26+
**Masked array with a non-trivial mask:**
27+
28+
.. code-block:: python
29+
30+
>>> dx = da.from_array(np.ma.array([1, 2, 3], mask=[1, 0, 0]))
31+
>>> dx
32+
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.MaskedArray>
33+
34+
**Masked array with a trivial i.e. all-Falsy mask:**
35+
36+
.. code-block:: python
37+
38+
>>> dy = da.from_array(np.ma.array([1, 2, 3], mask=[0, 0, 0]))
39+
>>> dy
40+
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.MaskedArray>
41+
42+
**Standard array i.e. no mask:**
43+
44+
.. code-block:: python
45+
46+
>>> dz = da.from_array(np.array([1, 2, 3]))
47+
>>> dz
48+
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.ndarray>
49+
50+
51+
Solution
52+
########
53+
54+
To work around the complication of not being able to know whether an array
55+
is a masked one or not in any cases of computation where a mask may be
56+
added, we will, for all these cases, use the fact that standard arrays (i.e.
57+
example 3 above) can also be queried with `da.ma.getmaskarray`, returning
58+
an all-False mask (just like a masked array with an all-False mask, i.e.
59+
example 2 above, would):
60+
61+
.. code-block:: python
62+
63+
>>> dz = da.from_array(np.array([1, 2, 3])) # i.e. example 3 above
64+
>>> mz = da.ma.getmaskarray(dz)
65+
>>> mz.compute()
66+
array([False, False, False])
67+
68+
>>> dy = da.from_array(np.ma.array([1, 2, 3], mask=[0, 0, 0])) # i.e. example 2
69+
>>> my = da.ma.getmaskarray(dy)
70+
>>> my.compute()
71+
array([False, False, False])
72+
73+
474
Hardness of the mask
5-
--------------------
75+
^^^^^^^^^^^^^^^^^^^^
676

777
Any `cf.Data` method that changes the dask array should consider
878
whether or not the mask hardness needs resetting before
@@ -22,3 +92,105 @@ The mask hardness is most easily reset with the
2292

2393
`cf.Data.__setitem__` and `cf.Data.where` are examples of methods that
2494
need to reset the mask in this manner.
95+
96+
97+
Laziness
98+
--------
99+
100+
To *be* lazy, or *not to be* lazy (in `cf.Data` itself)?
101+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
102+
103+
Central to Dask is lazy execution i.e. delayed computation:
104+
Dask operations essentially construct a graph of calculations
105+
or transformations (etc.) that are ready to run later,
106+
and only get evaluated together when requested with
107+
a `<dask object>.compute` call.
108+
109+
We want to utilise this laziness because it is central to the
110+
efficiency from using Dask, but to what extent to do we want
111+
to incorporate laziness into `cf.Data`? Namely, for
112+
an arbitary `cf.Data` method previously returning some result
113+
(say, a Boolean or an array), which of these should we return:
114+
115+
1. The **pre-computed result**, i.e. the outcome from running
116+
`compute` on the result graph constructed in the method
117+
(e.g. the same Boolean or an array, etc., as before); or
118+
2. The **uncomputed result**, i.e. a Dask object which only
119+
evaluates to the result in (1) when either the user or
120+
the code under-the-hood, later, runs a `compute`?
121+
122+
Arguments for choice (1) [advantages to (1) and disadvantages to (2)]:
123+
124+
* The simpler choice:
125+
126+
* means output is the same as before so documentation is easier and
127+
less change relative to previous versions;
128+
* logging and error handling can remain simple and as-is, whereas
129+
choice (2) would mean we don't know whether a given log or error
130+
message, dependent on the outcome, is applicable, so we can't
131+
call it immediately (perhaps at all?). We might have to defer to
132+
standard Dask messages, which would reduce specificity and clarity.
133+
* Testing will be simpler, as with (2) we would have to add `compute`
134+
calls in at appropriate points before running test assertions, etc.
135+
* Inspection methods can return as they do now, whereas with choice (2)
136+
we would have to work out what to show when certain aspects aren't
137+
yet computed.
138+
139+
Arguments for choice (2):
140+
141+
* The technically more complicated but more efficient choice, overall:
142+
143+
* This choice is more efficient when we build up chains of operations,
144+
because it avoids intermediate computation meaning parallelisation can
145+
be optimised more comprehensively by Dask.
146+
147+
As well as choice (1) or (2) outright, there are further options for
148+
a mixture or a flexible choice of return object in this respect:
149+
150+
3. Make use of a common keyword argument such as `precompute`
151+
on methods so users and under-the-hood in
152+
the code we can dictate whether or not to return the pre-computed or
153+
uncomputed result? That would give extra flexibility, but mean more
154+
boilerplate code (which can be consolidated somewhat, but at best
155+
will require some extra lines per method).
156+
157+
If this option is chosen, what would the best default be, `True`
158+
or `False`?
159+
160+
4. (DH's suggestion) Methods that return new cf.Data objects
161+
(such as transpose) should be lazy and other methods should not be
162+
(e.g. __repr__ and equals).
163+
164+
**We have agreed that (4) is the most sensible approach to take, therefore
165+
the working plan is** that:
166+
167+
* **any method (previously) returning a cf.Data object will,
168+
post-daskification, belazy and return the uncomputed result**, i.e. a
169+
Dask object that, when computed, will evaluate to the final cf.Data
170+
object (e.g. if computed immediately after the method runs, the result
171+
would be the same cf.Data object as that previously returned); but
172+
* **any method returning another object, such as a Boolean or a string
173+
representation of the object, will not be lazy and
174+
return the pre-computed object as before**.
175+
176+
177+
Logging and error handling
178+
^^^^^^^^^^^^^^^^^^^^^^^^^^
179+
180+
When Dask operations are uncomputed, we don't know whether certain logging
181+
and error messages are applicable or not.
182+
183+
Can we raise these in a delayed way, when we don't want to compute
184+
early, in the case we are in the middle of under-the-hood operations and
185+
also perhaps if we choose case (2) from the above points on extent of
186+
laziness? How can it be done? Possible ideas include:
187+
188+
* Using a `try/except` block whenever a custom error message is required,
189+
catching the corresponding Dask errors and raising our own messages.
190+
191+
192+
Inheritance from `cfdm`
193+
-----------------------
194+
195+
Generally, how do we deal with optimisation for objects and logic inherited
196+
from `cfdm`, since the current plan is not to Daskify `cfdm.Data`?

cf/data/abstract/compressedsubarray.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from functools import reduce
33
from operator import mul
44

5+
from ...functions import inspect as cf_inspect
6+
57

68
class CompressedSubarray(abc.ABC):
79
"""Abstract base class for a compressed sub-array container."""

cf/data/creation.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,21 @@
22
from functools import lru_cache
33
from uuid import uuid4
44

5-
import numpy as np
6-
75
import dask.array as da
6+
import numpy as np
87
from dask.array.core import getter, normalize_chunks, slices_from_chunks
9-
from dask.utils import SerializableLock
108
from dask.base import tokenize
119
from dask.config import config
12-
13-
from ..units import Units
14-
15-
from .utils import chunk_shapes, chunk_positions
10+
from dask.utils import SerializableLock
1611

1712
from . import (
1813
FilledArray,
1914
GatheredSubarray,
2015
RaggedContiguousSubarray,
21-
RaggedIndexedSubarray,
2216
RaggedIndexedContiguousSubarray,
17+
RaggedIndexedSubarray,
2318
)
24-
19+
from .utils import chunk_positions, chunk_shapes
2520

2621
# Cache of axis identities
2722
_cached_axes = {}
@@ -139,8 +134,9 @@ def compressed_to_dask(array):
139134

140135
count = array.get_count().dask_array(copy=False)
141136

142-
if is_small(count):
143-
count = count.compute()
137+
# TODODASK: remove with #297 merge
138+
# if is_small(count):
139+
# count = count.compute()
144140

145141
# Find the chunk sizes and positions of the uncompressed
146142
# array. Each chunk will contain the data for one instance,
@@ -198,8 +194,9 @@ def compressed_to_dask(array):
198194

199195
_, inverse = da.unique(index, return_inverse=True)
200196

201-
if is_very_small(index):
202-
inverse = inverse.compute()
197+
# TODODASK: remove with #297 merge
198+
# if is_very_small(index):
199+
# inverse = inverse.compute()
203200

204201
chunks = normalize_chunks(
205202
(1,) + (-1,) * (uncompressed_ndim - 1),
@@ -236,14 +233,16 @@ def compressed_to_dask(array):
236233
index = array.get_index().dask_array(copy=False)
237234
count = array.get_count().dask_array(copy=False)
238235

239-
if is_small(index):
240-
index = index.compute()
241-
index_is_dask = False
242-
else:
243-
index_is_dask = True
236+
# TODODASK: remove with #297 merge
237+
# if is_small(index):
238+
# index = index.compute()
239+
# index_is_dask = False
240+
# else:
241+
index_is_dask = True
244242

245-
if is_small(count):
246-
count = count.compute()
243+
# TODODASK: remove with #297 merge
244+
# if is_small(count):
245+
# count = count.compute()
247246

248247
cumlative_count = count.cumsum(axis=0)
249248

@@ -268,8 +267,9 @@ def compressed_to_dask(array):
268267
xprofile_indices = np.where(index == i)[0]
269268
if index_is_dask:
270269
xprofile_indices.compute_chunk_sizes()
271-
if is_small(xprofile_indices):
272-
xprofile_indices = xprofile_indices.compute()
270+
# TODODASK: remove with #297 merge
271+
# if is_small(xprofile_indices):
272+
# xprofile_indices = xprofile_indices.compute()
273273
# --- End: if
274274

275275
# Find the number of actual profiles in this instance

0 commit comments

Comments
 (0)