Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
synergy
o3skim
Commits
b61a0440
Commit
b61a0440
authored
Jan 25, 2021
by
BorjaEst
Browse files
Code cleaning
parent
7059e402
Changes
3
Hide whitespace changes
Inline
Side-by-side
o3skim/__init__.py
View file @
b61a0440
"""
Main package with classes and utilities to handle ozone data skimming.
Sources are responsible of loading the netCDF files from data and do
the standardization during the process.
After the data are loaded and the instances created,
it is possible to use the internal methods to produce
the desired output.
"""
import
logging
import
os
import
unittest
import
xarray
as
xr
import
pandas
as
pd
import
numpy
as
np
from
o3skim
import
extended_xarray
from
o3skim
import
source
from
o3skim
import
standardization
from
o3skim
import
utils
logger
=
logging
.
getLogger
(
'o3skim'
)
class
Source
:
"""Conceptual class for a data source. It is produced by the loading
and generation of internal instances from each data source model.
:param name: Name to provide to the source. The folder name with the
skimmed output data is preceded with this name before '_'.
:type name: str
:param collections: Dictionary where each 'key' is a name and its
value another dictionary with the variables contained at this
model. See :class:`o3skim.Model` for further details.
:type collections: dict
"""
def
__init__
(
self
,
name
,
collections
):
self
.
name
=
name
self
.
_models
=
{}
logger
.
info
(
"Loading source '%s'"
,
self
.
name
)
for
name
,
specifications
in
collections
.
items
():
logger
.
info
(
"Loading model '%s'"
,
name
)
model
=
_load_model
(
**
specifications
)
if
model
:
self
.
_models
[
name
]
=
model
def
__getitem__
(
self
,
model_name
):
return
self
.
_models
[
model_name
]
@
property
def
models
(
self
):
return
list
(
self
.
_models
.
keys
())
def
skim
(
self
,
groupby
=
None
):
"""Request to skim all source data into the current folder
:param groupby: How to group output (None, year, decade).
:type groupby: str, optional
"""
for
model
in
self
.
_models
:
dirname
=
"{source}_{model}"
.
format
(
source
=
self
.
name
,
model
=
model
)
os
.
makedirs
(
dirname
,
exist_ok
=
True
)
logger
.
info
(
"Skimming data from '%s'"
,
dirname
)
with
utils
.
cd
(
dirname
):
_skim
(
self
[
model
],
delta
=
groupby
)
@
utils
.
return_on_failure
(
"Error when loading model"
,
default
=
None
)
def
_load_model
(
tco3_zm
=
None
,
vmro3_zm
=
None
):
"""Loads a model merging standardized data from specified datasets."""
dataset
=
xr
.
Dataset
()
if
tco3_zm
:
logger
.
debug
(
"Loading tco3_zm into model"
)
with
xr
.
open_mfdataset
(
tco3_zm
[
'paths'
])
as
load
:
standardized
=
standardization
.
standardize_tco3
(
dataset
=
load
,
variable
=
tco3_zm
[
'name'
],
coordinates
=
tco3_zm
[
'coordinates'
])
dataset
=
dataset
.
merge
(
standardized
)
if
vmro3_zm
:
logger
.
debug
(
"Loading vmro3_zm into model"
)
with
xr
.
open_mfdataset
(
vmro3_zm
[
'paths'
])
as
load
:
standardized
=
standardization
.
standardize_vmro3
(
dataset
=
load
,
variable
=
vmro3_zm
[
'name'
],
coordinates
=
vmro3_zm
[
'coordinates'
])
dataset
=
dataset
.
merge
(
standardized
)
return
dataset
def
_skim
(
model
,
delta
=
None
):
"""Skims model producing reduced dataset files"""
logger
.
debug
(
"Skimming model with delta {}"
.
format
(
delta
))
skimmed
=
model
.
model
.
skim
()
if
delta
==
'year'
:
def
tco3_path
(
y
):
return
"tco3_zm_{}-{}.nc"
.
format
(
y
,
y
+
1
)
def
vmro3_path
(
y
):
return
"vmro3_zm_{}-{}.nc"
.
format
(
y
,
y
+
1
)
groups
=
skimmed
.
model
.
groupby_year
()
elif
delta
==
'decade'
:
def
tco3_path
(
y
):
return
"tco3_zm_{}-{}.nc"
.
format
(
y
,
y
+
10
)
def
vmro3_path
(
y
):
return
"vmro3_zm_{}-{}.nc"
.
format
(
y
,
y
+
10
)
groups
=
skimmed
.
model
.
groupby_decade
()
else
:
def
tco3_path
(
_
):
return
"tco3_zm.nc"
def
vmro3_path
(
_
):
return
"vmro3_zm.nc"
groups
=
[(
None
,
skimmed
),
]
years
,
datasets
=
zip
(
*
groups
)
if
skimmed
.
model
.
tco3
:
logger
.
debug
(
"Saving skimed tco3 into files"
)
xr
.
save_mfdataset
(
datasets
=
[
ds
.
model
.
tco3
for
ds
in
datasets
],
paths
=
[
tco3_path
(
year
)
for
year
in
years
]
)
if
skimmed
.
model
.
vmro3
:
logger
.
debug
(
"Saving skimed vmro3 into files"
)
xr
.
save_mfdataset
(
datasets
=
[
ds
.
model
.
vmro3
for
ds
in
datasets
],
paths
=
[
vmro3_path
(
year
)
for
year
in
years
]
)
class
TestsSource
(
unittest
.
TestCase
):
name
=
"SourceTest"
collections
=
{}
# Empty, only to test constructor stability
def
setUp
(
self
):
self
.
source
=
Source
(
TestsSource
.
name
,
TestsSource
.
collections
)
def
test_property_name
(
self
):
expected
=
TestsSource
.
name
result
=
self
.
source
.
name
self
.
assertEqual
(
expected
,
result
)
def
test_property_models
(
self
):
expected
=
list
(
TestsSource
.
collections
.
keys
())
result
=
self
.
source
.
models
self
.
assertEqual
(
expected
,
result
)
class
TestsModel
(
unittest
.
TestCase
):
tco3
=
np
.
random
.
rand
(
3
,
3
,
25
)
vmro3
=
np
.
random
.
rand
(
3
,
3
,
4
,
25
)
@
staticmethod
def
model
():
return
xr
.
Dataset
(
data_vars
=
dict
(
tco3_zm
=
([
"lon"
,
"lat"
,
"time"
],
TestsModel
.
tco3
),
vmro3_zm
=
([
"lon"
,
"lat"
,
"plev"
,
"time"
],
TestsModel
.
vmro3
)
),
coords
=
dict
(
lon
=
[
-
180
,
0
,
180
],
lat
=
[
-
90
,
0
,
90
],
plev
=
[
1
,
10
,
100
,
1000
],
time
=
pd
.
date_range
(
"2000-01-01"
,
periods
=
25
,
freq
=
'A'
)
),
attrs
=
dict
(
description
=
"Test dataset"
)
)
def
assertHasAttr
(
self
,
obj
,
intendedAttr
):
testBool
=
hasattr
(
obj
,
intendedAttr
)
msg
=
'obj lacking an attribute. obj: %s, intendedAttr: %s'
%
(
obj
,
intendedAttr
)
self
.
assertTrue
(
testBool
,
msg
=
msg
)
def
test_dataset_has_model_accessor
(
self
):
model
=
TestsModel
.
model
()
self
.
assertHasAttr
(
model
,
'model'
)
Source
=
source
.
Source
o3skim/source.py
0 → 100644
View file @
b61a0440
"""
Module in charge of source implementation.
Sources are responsible of loading the netCDF files from data and do
the standardization during the process.
"""
import
logging
import
os
import
unittest
import
pandas
as
pd
import
numpy
as
np
import
xarray
as
xr
from
o3skim
import
extended_xarray
from
o3skim
import
standardization
from
o3skim
import
utils
logger
=
logging
.
getLogger
(
'source'
)
class
Source
:
"""Conceptual class for a data source. It is produced by the loading
and generation of internal instances from each data source model.
:param name: Name to provide to the source. The folder name with the
skimmed output data is preceded with this name before '_'.
:type name: str
:param collections: Dictionary where each 'key' is a name and its
value another dictionary with the variables contained at this
model. See :class:`o3skim.Model` for further details.
:type collections: dict
"""
def
__init__
(
self
,
name
,
collections
):
self
.
name
=
name
self
.
_models
=
{}
logger
.
info
(
"Loading source '%s'"
,
self
.
name
)
for
name
,
specifications
in
collections
.
items
():
logger
.
info
(
"Loading model '%s'"
,
name
)
model
=
_load_model
(
**
specifications
)
if
model
:
self
.
_models
[
name
]
=
model
def
__getitem__
(
self
,
model_name
):
return
self
.
_models
[
model_name
]
@
property
def
models
(
self
):
return
list
(
self
.
_models
.
keys
())
def
skim
(
self
,
groupby
=
None
):
"""Request to skim all source data into the current folder
:param groupby: How to group output (None, year, decade).
:type groupby: str, optional
"""
for
model
in
self
.
_models
:
dirname
=
"{source}_{model}"
.
format
(
source
=
self
.
name
,
model
=
model
)
os
.
makedirs
(
dirname
,
exist_ok
=
True
)
logger
.
info
(
"Skimming data from '%s'"
,
dirname
)
with
utils
.
cd
(
dirname
):
_skim
(
self
[
model
],
delta
=
groupby
)
@
utils
.
return_on_failure
(
"Error when loading model"
,
default
=
None
)
def
_load_model
(
tco3_zm
=
None
,
vmro3_zm
=
None
):
"""Loads a model merging standardized data from specified datasets."""
dataset
=
xr
.
Dataset
()
if
tco3_zm
:
logger
.
debug
(
"Loading tco3_zm into model"
)
with
xr
.
open_mfdataset
(
tco3_zm
[
'paths'
])
as
load
:
standardized
=
standardization
.
standardize_tco3
(
dataset
=
load
,
variable
=
tco3_zm
[
'name'
],
coordinates
=
tco3_zm
[
'coordinates'
])
dataset
=
dataset
.
merge
(
standardized
)
if
vmro3_zm
:
logger
.
debug
(
"Loading vmro3_zm into model"
)
with
xr
.
open_mfdataset
(
vmro3_zm
[
'paths'
])
as
load
:
standardized
=
standardization
.
standardize_vmro3
(
dataset
=
load
,
variable
=
vmro3_zm
[
'name'
],
coordinates
=
vmro3_zm
[
'coordinates'
])
dataset
=
dataset
.
merge
(
standardized
)
return
dataset
def
_skim
(
model
,
delta
=
None
):
"""Skims model producing reduced dataset files"""
logger
.
debug
(
"Skimming model with delta {}"
.
format
(
delta
))
skimmed
=
model
.
model
.
skim
()
if
delta
==
'year'
:
def
tco3_path
(
y
):
return
"tco3_zm_{}-{}.nc"
.
format
(
y
,
y
+
1
)
def
vmro3_path
(
y
):
return
"vmro3_zm_{}-{}.nc"
.
format
(
y
,
y
+
1
)
groups
=
skimmed
.
model
.
groupby_year
()
elif
delta
==
'decade'
:
def
tco3_path
(
y
):
return
"tco3_zm_{}-{}.nc"
.
format
(
y
,
y
+
10
)
def
vmro3_path
(
y
):
return
"vmro3_zm_{}-{}.nc"
.
format
(
y
,
y
+
10
)
groups
=
skimmed
.
model
.
groupby_decade
()
else
:
def
tco3_path
(
_
):
return
"tco3_zm.nc"
def
vmro3_path
(
_
):
return
"vmro3_zm.nc"
groups
=
[(
None
,
skimmed
),
]
years
,
datasets
=
zip
(
*
groups
)
if
skimmed
.
model
.
tco3
:
logger
.
debug
(
"Saving skimed tco3 into files"
)
xr
.
save_mfdataset
(
datasets
=
[
ds
.
model
.
tco3
for
ds
in
datasets
],
paths
=
[
tco3_path
(
year
)
for
year
in
years
]
)
if
skimmed
.
model
.
vmro3
:
logger
.
debug
(
"Saving skimed vmro3 into files"
)
xr
.
save_mfdataset
(
datasets
=
[
ds
.
model
.
vmro3
for
ds
in
datasets
],
paths
=
[
vmro3_path
(
year
)
for
year
in
years
]
)
class
TestsSource
(
unittest
.
TestCase
):
name
=
"SourceTest"
collections
=
{}
# Empty, only to test constructor stability
def
setUp
(
self
):
self
.
source
=
Source
(
TestsSource
.
name
,
TestsSource
.
collections
)
def
test_property_name
(
self
):
expected
=
TestsSource
.
name
result
=
self
.
source
.
name
self
.
assertEqual
(
expected
,
result
)
def
test_property_models
(
self
):
expected
=
list
(
TestsSource
.
collections
.
keys
())
result
=
self
.
source
.
models
self
.
assertEqual
(
expected
,
result
)
class
TestsModel
(
unittest
.
TestCase
):
tco3
=
np
.
random
.
rand
(
3
,
3
,
25
)
vmro3
=
np
.
random
.
rand
(
3
,
3
,
4
,
25
)
@
staticmethod
def
model
():
return
xr
.
Dataset
(
data_vars
=
dict
(
tco3_zm
=
([
"lon"
,
"lat"
,
"time"
],
TestsModel
.
tco3
),
vmro3_zm
=
([
"lon"
,
"lat"
,
"plev"
,
"time"
],
TestsModel
.
vmro3
)
),
coords
=
dict
(
lon
=
[
-
180
,
0
,
180
],
lat
=
[
-
90
,
0
,
90
],
plev
=
[
1
,
10
,
100
,
1000
],
time
=
pd
.
date_range
(
"2000-01-01"
,
periods
=
25
,
freq
=
'A'
)
),
attrs
=
dict
(
description
=
"Test dataset"
)
)
def
assertHasAttr
(
self
,
obj
,
intendedAttr
):
testBool
=
hasattr
(
obj
,
intendedAttr
)
msg
=
'obj lacking an attribute. obj: %s, intendedAttr: %s'
%
(
obj
,
intendedAttr
)
self
.
assertTrue
(
testBool
,
msg
=
msg
)
def
test_dataset_has_model_accessor
(
self
):
model
=
TestsModel
.
model
()
self
.
assertHasAttr
(
model
,
'model'
)
o3skim/utils.py
View file @
b61a0440
...
...
@@ -73,59 +73,3 @@ def load(yaml_file):
config
=
yaml
.
safe_load
(
ymlfile
)
logging
.
debug
(
"Configuration data: %s"
,
config
)
return
config
def
create_empty_netCDF
(
fname
):
"""Creates a new empty netCDF file.
:param fname: Name and path where to create the file.
:type fname: str
"""
root_grp
=
netCDF4
.
Dataset
(
fname
,
'w'
,
format
=
'NETCDF4'
)
root_grp
.
description
=
'Example simulation data'
root_grp
.
close
()
def
to_netcdf
(
dirname
,
name
,
dataset
,
groupby
=
None
):
"""Creates or appends data to named netCDF files.
:param path: Location where to find or create the netCDF files.
:type path: str
:param name: Name/Prefix for file/s where to store the data.
:type name: str
:param dataset: Dataset to write to the netCDF file.
:type dataset: :class:`xarray.Dataset`
:param groupby: How to group files (None, year, decade).
:type groupby: str, optional
"""
def
split_by_year
(
dataset
):
"""Splits a dataset by year"""
years
,
dsx
=
zip
(
*
dataset
.
groupby
(
"time.year"
))
fnames
=
[
dirname
+
"/"
+
name
+
"_%s.nc"
%
y
for
y
in
years
]
return
fnames
,
dsx
def
split_by_decade
(
dataset
):
"""Splits a dataset by decade"""
decades
=
dataset
.
indexes
[
"time"
].
year
//
10
*
10
decades
,
dsx
=
zip
(
*
dataset
.
groupby
(
xr
.
DataArray
(
decades
)))
fnames
=
[
dirname
+
"/"
+
name
+
"_%s-%s.nc"
%
(
d
,
d
+
10
)
for
d
in
decades
]
return
fnames
,
dsx
def
no_split
(
dataset
):
"""Does not split a dataset"""
dsx
=
(
dataset
,)
fnames
=
[
dirname
+
"/"
+
name
+
".nc"
]
return
fnames
,
dsx
split_by
=
{
"year"
:
split_by_year
,
"decade"
:
split_by_decade
}
fnames
,
dsx
=
split_by
.
get
(
groupby
,
no_split
)(
dataset
)
logging
.
info
(
"Save dataset into: %s"
,
fnames
)
[
create_empty_netCDF
(
fn
)
for
fn
in
fnames
if
not
os
.
path
.
isfile
(
fn
)]
xr
.
save_mfdataset
(
dsx
,
fnames
,
mode
=
'a'
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment