Commit a959cc73 authored by BorjaEst's avatar BorjaEst
Browse files

Merge branch '32-model-to-dataset-subclass'

parents f0502d69 6816f855
......@@ -7,16 +7,16 @@ o3skim package
:show-inheritance:
o3skim.sources module
---------------------
o3skim.standardization module
-----------------------------
.. automodule:: o3skim.sources
.. automodule:: o3skim.standardization
:members:
:undoc-members:
:show-inheritance:
o3skim.utils module
-------------------
------------------------------
.. automodule:: o3skim.utils
:members:
......
......@@ -2,11 +2,14 @@
"""
o3skim is a tool for data pre-processing of ozone applications
"""
from o3skim import sources, utils
import sys
import argparse
import logging
import warnings
import o3skim
from o3skim import utils
warnings.simplefilter(action='ignore', category=FutureWarning)
......@@ -29,7 +32,7 @@ if __name__ == '__main__':
args = cmdline_args()
# Set logging level
logformat = '%(asctime)s %(name)-16s %(levelname)-8s %(message)s'
logformat = '%(asctime)s %(name)-24s %(levelname)-8s %(message)s'
logging.basicConfig(level=getattr(logging, args.verbosity), format=logformat)
# Configuration load
......@@ -39,7 +42,7 @@ if __name__ == '__main__':
# Create sources
logging.info("Loading data from './data' ")
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
ds = {name: o3skim.Source(name, collection) for
name, collection in config.items()}
# Skim output
......
"""
Main package with classes and utilities to handle ozone data skimming.
"""
\ No newline at end of file
Sources are responsible of loading the netCDF files from data and do
the standardization during the process.
After the data are loaded and the instances created,
it is possible to use the internal methods to produce
the desired output.
"""
import logging
import os
from o3skim import standardization
import xarray as xr
logger = logging.getLogger('o3skim')
class Source:
"""Conceptual class for a data source. It is produced by the loading
and generation of internal instances from each data source model.
:param sname: Name to provide to the source. The folder name with the
skimmed output data is preceded with this name before '_'.
:type sname: str
:param collections: Dictionary where each 'key' is a name and its
value another dictionary with the variables contained at this
model. See :class:`o3skim.Model` for further details.
:type collections: dict
"""
def __init__(self, sname, collections):
self._name = sname
self._models = {}
logging.info("Load source '%s'", self._name)
for name, variables in collections.items():
logging.info("Load model '%s'", name)
self._models[name] = Model(variables)
def skim(self, groupby=None, **kwargs):
"""Request to skim all source data into the current folder
:param groupby: How to group output (None, year, decade).
:type groupby: str, optional
"""
for name, model in self._models.items():
dirname = self._name + "_" + name
os.makedirs(dirname, exist_ok=True)
logger.info("Skim data from '%s'", dirname)
model.to_netcdf(dirname, groupby, **kwargs)
class Model(xr.Dataset):
"""Conceptual class for model with variables. It is produced by the
loading of the variables to be skimmed.
:param variables: Dictionary with information about how to load
the source where each 'key' is the name of the variable to
load and the value is the load details.
:type variables: dict
"""
def __init__(self, specifications):
ds = xr.Dataset()
for variable in specifications:
load = standardization.load(variable, specifications[variable])
ds = ds.merge(load)
# Containment
self.dataset = ds
def __getattr__(self, attr):
# Delegation (get)
return getattr(self.dataset, attr)
def __setattr__(self, attr, value):
# Delegation (set)
setattr(self.dataset, attr, value)
def groupby(self, delta='year'):
"""Returns a GroupBy object for performing grouped operations
over the time coordinate.
A groupby operation involves some combination of splitting
the object, applying a function, and combining the results.
:param delta: How to group files (None, year, decade).
:type delta: str or None, optional
:return: A list of tuples (range, group model).
:rtype: [(str, Model)]
"""
if delta == None:
return [("", self)]
logger.debug("Group model by: '{0}' ".format(delta))
if delta == 'year':
delta = 1
if delta == 'decade':
delta = 10
years = self.indexes['time'].map(lambda x: x.year // delta * delta)
group = super().groupby(xr.DataArray(years))
return [(str(y) + "-" + str(y + delta), ds) for y, ds in group]
def split_variables(self):
"""Request to split model variables into individual models.
:return: A list of tuples (variable, var Model).
:rtype: [(str, Model)]
"""
var_models = []
for var in self.data_vars:
logger.debug("Internal var: '{0}' to dataset".format(var))
var_models.append((var, self[var].to_dataset()))
return var_models
def to_netcdf(self, path, delta=None, **kwargs):
"""Request to save model data into the specified path
:param path: Path where to place the output files.
:type path: str
:param delta: How to group output (None, year, decade).
:type delta: str or None, optional
For extra inputs see also :func:`xarray.save_mfdataset`
"""
datasets = []
paths = []
for t_range, ds1 in self.groupby(delta=delta):
for var, ds2 in ds1.split_variables():
datasets.append(ds2)
if t_range == "":
paths.append(path + "/" + var + ".nc")
else:
paths.append(path + "/" + var + "_" + t_range + ".nc")
logging.info("Save dataset into: %s", paths)
if paths:
xr.save_mfdataset(datasets, paths, **kwargs)
"""Module in charge of creating the sources objects.
This objects are responsible of loading and the netCDF
files from data and do the standardization during the
process.
After the data are loaded and the instances created,
it is possible to use the internal methods to produce
the desired output.
"""
import glob
import xarray as xr
import os.path
import logging
from . import utils
logger = logging.getLogger('o3skim.sources')
class Source:
"""Conceptual class for a data source. It is produced by the loading
and generation of internal instances from each data source model.
:param sname: Name to provide to the source. The folder name with the
skimmed output data is preceded with this name before '_'.
:type sname: str
:param collections: Dictionary where each 'key' is a name and its
value another dictionary with the variables contained at this
model. See :class:`o3skim.sources.Model` for further details.
:type collections: dict
"""
def __init__(self, sname, collections):
self._name = sname
self._models = {}
logging.info("Load source '%s'", self._name)
for name, variables in collections.items():
logging.info("Load model '%s'", name)
self._models[name] = Model(variables)
def skim(self, groupby=None):
"""Request to skim all source data into the current folder
:param groupby: How to group output (None, year, decade).
:type groupby: str, optional
"""
for name, model in self._models.items():
dirname = self._name + "_" + name
os.makedirs(dirname, exist_ok=True)
logger.info("Skim data from '%s'", dirname)
model.skim(dirname, groupby)
class Model:
"""Conceptual class for model with variables. It is produced by the
loading of the variables to be skimmed.
:param variables: Dictionary with information about how to load
the source where each 'key' is the name of the variable to
load and the value is the load details.
:type variables: dict
"""
def __init__(self, variables):
if 'tco3_zm' in variables:
logger.debug("Load 'tco3_zm' data")
self.__get_tco3_zm(**variables)
if 'vmro3_zm' in variables:
logger.debug("Load 'vmro3_zm' data")
self.__get_vmro3_zm(**variables)
def skim(self, dirname, groupby=None):
"""Request to skim all source data into the specified path
:param dirname: Path where to place the output files.
:type dirname: str
:param groupby: How to group output (None, year, decade).
:type groupby: str, optional
"""
if hasattr(self, '_tco3_zm'):
logger.debug("Skim 'tco3_zm' data")
utils.to_netcdf(dirname, "tco3_zm", self._tco3_zm, groupby)
if hasattr(self, '_vmro3_zm'):
logger.debug("Skim 'vmro3_zm' data")
utils.to_netcdf(dirname, "vmro3_zm", self._vmro3_zm, groupby)
@utils.return_on_failure("Error when loading 'tco3_zm'")
def __get_tco3_zm(self, tco3_zm, **kwarg):
"""Gets and standarises the tco3_zm data"""
with xr.open_mfdataset(tco3_zm['paths']) as dataset:
dataset = dataset.rename({
tco3_zm['name']: 'tco3_zm',
tco3_zm['coordinates']['time']: 'time',
tco3_zm['coordinates']['lat']: 'lat',
tco3_zm['coordinates']['lon']: 'lon'
})['tco3_zm'].to_dataset()
self._tco3_zm = dataset.mean(dim='lon')
@utils.return_on_failure("Error when loading 'vmro3_zm'")
def __get_vmro3_zm(self, vmro3_zm, **kwarg):
"""Gets and standarises the vmro3_zm data"""
with xr.open_mfdataset(vmro3_zm['paths']) as dataset:
dataset = dataset.rename({
vmro3_zm['name']: 'vmro3_zm',
vmro3_zm['coordinates']['time']: 'time',
vmro3_zm['coordinates']['plev']: 'plev',
vmro3_zm['coordinates']['lat']: 'lat',
vmro3_zm['coordinates']['lon']: 'lon'
})['vmro3_zm'].to_dataset()
self._vmro3_zm = dataset.mean(dim='lon')
"""Module in charge of dataset standardization when loading models."""
import logging
import xarray as xr
from o3skim import utils
logger = logging.getLogger('o3skim.standardization')
# tco3 standardization
tco3_standard_name = 'tco3_zm'
tco3_mean_coordinate = 'lon'
tco3_standard_coordinates = [
'time',
'lat',
'lon'
]
# vmro3 standardization
vmro3_standard_name = 'vmro3_zm'
vmro3_mean_coordinate = 'lon'
vmro3_standard_coordinates = [
'time',
'plev',
'lat',
'lon'
]
@utils.return_on_failure("Error when loading '{0}'".format(tco3_standard_name),
xr.Dataset())
def __load_tco3(name, paths, coordinates):
"""Loads and standarises the tco3 data"""
logger.debug("Standard loading of '{0}' data".format(tco3_standard_name))
with xr.open_mfdataset(paths) as dataset:
dataset = dataset.rename({
**{name: tco3_standard_name},
**{coordinates[x]: x for x in tco3_standard_coordinates}
})[tco3_standard_name].to_dataset()
return dataset.mean(dim=tco3_mean_coordinate)
@utils.return_on_failure("Error when loading '{0}'".format(vmro3_standard_name),
xr.Dataset())
def __load_vmro3(name, paths, coordinates):
"""Loads and standarises the vmro3 data"""
logger.debug("Standard loading of '{0}' data".format(vmro3_standard_name))
with xr.open_mfdataset(paths) as dataset:
dataset = dataset.rename({
**{name: vmro3_standard_name},
**{coordinates[x]: x for x in vmro3_standard_coordinates}
})[vmro3_standard_name].to_dataset()
return dataset.mean(dim=vmro3_mean_coordinate)
# Load case dictionary
__loads = {
tco3_standard_name: __load_tco3,
vmro3_standard_name: __load_vmro3
}
# Non existing variable exception
class UnknownVariable(Exception):
"""To raise if variable to treat is unknown"""
def __init__(self, variable, message="Unknown variable"):
self.variable = variable
self.message = message
super().__init__(self.message)
def load(variable, configuration):
"""Loads and standarises the variable using a specific
configuration.
:param variable: Loadable variable.
:type variable: str
:param configuration: Configuration to apply standardization.
:type configuration: dict
:return: A standardized dataset.
:rtype: xarray.Dataset
"""
try:
function = __loads[variable]
except KeyError:
raise UnknownVariable(variable)
return function(**configuration)
......@@ -35,20 +35,24 @@ def cd(newdir):
logger.debug("Restore directory: '%s'", prevdir)
def return_on_failure(message):
def return_on_failure(message, default=None):
"""Decorator to do not break but log. Note that the error stack
is printed as well to do not lose relevant information.
:param message: Additional message to log when the function fails.
:type message: str
:param default: Value to return if fails.
:type default: any or None, optional
"""
def decorate(function):
def applicator(*args, **kwargs):
try:
function(*args, **kwargs)
return function(*args, **kwargs)
except:
# Log error with stak using root (not utils)
# Log error with stack using root (not utils)
logging.error(message, exc_info=True)
return default
return applicator
return decorate
......
ECMWF:
ERA-i:
ErrorModels:
correct_variable:
vmro3_zm: # Correct variable
name: vmro3
paths: Ecmwf/Erai/vmro3_????.nc
......@@ -8,6 +8,7 @@ ECMWF:
plev: level
lat: latitude
lon: longitude
non_existing_variable:
tco3_zm: # Incorrect variable
name: non_existing_var
paths: Ecmwf/Erai/toz_????.nc
......
"""Unittest module template."""
import os
import shutil
import unittest
import xarray as xr
import glob
from o3skim import sources, utils
# from pyfakefs.fake_filesystem_unittest import TestCase
from . import mockup_data
from . import mockup_noise
import o3skim
import xarray as xr
from tests import mockup_data
from tests import mockup_noise
class TestO3SKIM_sources(unittest.TestCase):
......@@ -18,9 +17,9 @@ class TestO3SKIM_sources(unittest.TestCase):
def setUp(self):
"""Loads and creates the test folders and files from test_sources.yaml"""
self.config_base = utils.load("tests/sources_base.yaml")
self.config_base = o3skim.utils.load("tests/sources_base.yaml")
self.assertTrue(type(self.config_base) is dict)
self.config_err = utils.load("tests/sources_err.yaml")
self.config_err = o3skim.utils.load("tests/sources_err.yaml")
self.assertTrue(type(self.config_err) is dict)
self.create_mock_datasets()
......@@ -33,7 +32,7 @@ class TestO3SKIM_sources(unittest.TestCase):
def create_mock_datasets(self):
"""Creates mock data files according to the loaded configuration"""
with utils.cd('data'):
with o3skim.utils.cd('data'):
for _, collection in self.config_base.items():
for _, variables in collection.items():
for _, vinfo in variables.items():
......@@ -43,33 +42,33 @@ class TestO3SKIM_sources(unittest.TestCase):
def create_noise_datasets(self):
"""Creates noise data files according to the noise configuration"""
config_noise = utils.load("tests/noise_files.yaml")
with utils.cd('data'):
config_noise = o3skim.utils.load("tests/noise_files.yaml")
with o3skim.utils.cd('data'):
for ninfo in config_noise:
mockup_noise.netcdf(**ninfo)
def clean_output(self):
"""Cleans output removing all folders at output"""
with utils.cd('output'):
with o3skim.utils.cd('output'):
directories = (d for d in os.listdir() if os.path.isdir(d))
for directory in directories:
shutil.rmtree(directory)
shutil.rmtree(directory)
def backup_datasets(self):
"""Loads the mock datasets into an internal variable"""
self.ds_backup = {}
with utils.cd('data'):
with o3skim.utils.cd('data'):
for source, collection in self.config_base.items():
self.ds_backup[source] = {}
for model, variables in collection.items():
self.ds_backup[source][model] = {}
for v, vinfo in variables.items():
for v, vinfo in variables.items():
with xr.open_mfdataset(vinfo['paths']) as ds:
self.ds_backup[source][model][v] = ds
def assert_with_backup(self):
"""Asserts the dataset in the backup is equal to the config load"""
with utils.cd('data'):
with o3skim.utils.cd('data'):
for source, collection in self.config_base.items():
for model, variables in collection.items():
for v, vinfo in variables.items():
......@@ -79,31 +78,31 @@ class TestO3SKIM_sources(unittest.TestCase):
def test_001_SourcesFromConfig(self):
"""Creates the different sources from the configuration file"""
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
with o3skim.utils.cd("data"):
ds = {name: o3skim.Source(name, collection) for
name, collection in self.config_base.items()}
# CCMI-1 tco3_zm asserts
self.assertTrue('time' in ds['CCMI-1']._models['IPSL']._tco3_zm.coords)
self.assertTrue('lat' in ds['CCMI-1']._models['IPSL']._tco3_zm.coords)
self.assertFalse('lon' in ds['CCMI-1']._models['IPSL']._tco3_zm.coords)
self.assertTrue( 'time' in ds['CCMI-1']._models['IPSL']['tco3_zm'].coords)
self.assertTrue( 'lat' in ds['CCMI-1']._models['IPSL']['tco3_zm'].coords)
self.assertFalse( 'lon' in ds['CCMI-1']._models['IPSL']['tco3_zm'].coords)
# CCMI-1 vmro3_zm asserts
self.assertTrue('time' in ds['CCMI-1']._models['IPSL']._vmro3_zm.coords)
self.assertTrue('plev' in ds['CCMI-1']._models['IPSL']._vmro3_zm.coords)
self.assertTrue('lat' in ds['CCMI-1']._models['IPSL']._vmro3_zm.coords)
self.assertFalse('lon' in ds['CCMI-1']._models['IPSL']._vmro3_zm.coords)
self.assertTrue( 'time' in ds['CCMI-1']._models['IPSL']['vmro3_zm'].coords)
self.assertTrue( 'plev' in ds['CCMI-1']._models['IPSL']['vmro3_zm'].coords)
self.assertTrue( 'lat' in ds['CCMI-1']._models['IPSL']['vmro3_zm'].coords)
self.assertFalse( 'lon' in ds['CCMI-1']._models['IPSL']['vmro3_zm'].coords)
# Checks the original data has not been modified
self.assert_with_backup()
def test_002_OutputFromSources(self):
"""Skims the data into the output folder"""
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
with o3skim.utils.cd("data"):
ds = {name: o3skim.Source(name, collection) for
name, collection in self.config_base.items()}
with utils.cd("output"):
with o3skim.utils.cd("output"):
[source.skim() for source in ds.values()]
# CCMI-1 data skim asserts
......@@ -125,24 +124,24 @@ class TestO3SKIM_sources(unittest.TestCase):
def test_003_OutputSplitByYear(self):
"""Skims the data into the output folder spliting by year"""
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
with o3skim.utils.cd("data"):
ds = {name: o3skim.Source(name, collection) for
name, collection in self.config_base.items()}
with utils.cd("output"):
with o3skim.utils.cd("output"):
[source.skim(groupby="year") for source in ds.values()]
# CCMI-1 data skim asserts
self.assertTrue(os.path.isdir("output/CCMI-1_IPSL"))
self.assertTrue(os.path.exists("output/CCMI-1_IPSL/tco3_zm_2000.nc"))
self.assertTrue(os.path.exists("output/CCMI-1_IPSL/vmro3_zm_2000.nc"))
self.assertTrue(os.path.exists("output/CCMI-1_IPSL/tco3_zm_2000-2001.nc"))
self.assertTrue(os.path.exists("output/CCMI-1_IPSL/vmro3_zm_2000-2001.nc"))
# ECMWF data skim asserts
self.assertTrue(os.path.isdir("output/ECMWF_ERA-5"))
self.assertTrue(os.path.exists("output/ECMWF_ERA-5/tco3_zm_2000.nc"))
self.assertTrue(os.path.exists("output/ECMWF_ERA-5/tco3_zm_2000-2001.nc"))
self.assertTrue(os.path.isdir("output/ECMWF_ERA-i"))
self.assertTrue(os.path.exists("output/ECMWF_ERA-i/tco3_zm_2000.nc"))
self.assertTrue(os.path.exists("output/ECMWF_ERA-i/vmro3_zm_2000.nc"))
self.assertTrue(os.path.exists("output/ECMWF_ERA-i/tco3_zm_2000-2001.nc"))
self.assertTrue(os.path.exists("output/ECMWF_ERA-i/vmro3_zm_2000-2001.nc"))
# Checks the original data has not been modified
self.assert_with_backup()
......@@ -151,16 +150,20 @@ class TestO3SKIM_sources(unittest.TestCase):
def test_004_SourceErrorDontBreak(self):
"""The execution does not stop by an error in source"""
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
with o3skim.utils.cd("data"):
ds = {name: o3skim.Source(name, collection) for