Commit 6e57281c authored by BorjaEst's avatar BorjaEst
Browse files

Merge branch 'dev' into 20-resume-skimming-if-one-dataset-fails

parents 36f3b91e ca894a83
......@@ -26,20 +26,18 @@ if __name__ == '__main__':
args = cmdline_args()
# Set logging level
logging.basicConfig(level=getattr(logging, args.verbosity),
format='%(asctime)s - %(levelname)-8s - %(message)s')
logformat = '%(asctime)s %(name)-16s %(levelname)-8s %(message)s'
logging.basicConfig(level=getattr(logging, args.verbosity), format=logformat)
# Configuration load
logging.info("Lookinf for config at: '%s'", args.sources_file)
config = utils.load(args.sources_file)
logging.info("Configuration found at: '%s'", args.sources_file)
logging.debug("Configuration info: %s", config)
# Create sources
logging.info("Loading data from './data' ")
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
name, collection in config.items()}
logging.debug("Sources loaded: %s", [k for k in config.keys()])
# Skim output
logging.info("Skimming data to './output' ")
......
......@@ -2,8 +2,11 @@
import glob
import xarray as xr
import os.path
import logging
from . import utils
logger = logging.getLogger('o3skim.sources')
class Source:
"""Standarized datasets and methods from a data source"""
......@@ -11,13 +14,16 @@ class Source:
def __init__(self, sname, collections):
self._name = sname
self._models = {}
logging.info("Load source '%s'", self._name)
for name, variables in collections.items():
logging.info("Load model '%s'", name)
self._models[name] = Model(variables)
def skim(self):
for name, model in self._models.items():
path = self._name + "_" + name
os.makedirs(path, exist_ok=True)
logger.info("Skim data from '%s'", path)
model.skim(path)
......@@ -25,38 +31,44 @@ class Model:
"""Standarised model with standarised variables"""
def __init__(self, variables):
self.__get_tco3_zm(**variables)
self.__get_vrm_zm(**variables)
if 'tco3_zm' in variables:
logger.debug("Load 'tco3_zm' data")
self.__get_tco3_zm(**variables)
if 'vrm_zm' in variables:
logger.debug("Load 'vrm_zm' data")
self.__get_vrm_zm(**variables)
def skim(self, path):
if hasattr(self, '_tco3_zm'):
logger.debug("Skim 'tco3_zm' data")
utils.to_netcdf(path, "tco3_zm", self._tco3_zm)
if hasattr(self, '_vrm_zm'):
logger.debug("Skim 'vrm_zm' data")
utils.to_netcdf(path, "vrm_zm", self._vrm_zm)
def __get_tco3_zm(self, tco3_zm=None, **kwarg):
@utils.return_on_failure("Error when loading 'tco3_zm'")
def __get_tco3_zm(self, tco3_zm, **kwarg):
"""Gets and standarises the tco3_zm data"""
if tco3_zm:
fnames = glob.glob(tco3_zm['dir'] + "/*.nc")
with xr.open_mfdataset(fnames) as dataset:
dataset = dataset.rename({
tco3_zm['name']: 'tco3_zm',
tco3_zm['coordinades']['time']: 'time',
tco3_zm['coordinades']['lat']: 'lat',
tco3_zm['coordinades']['lon']: 'lon'
})['tco3_zm'].to_dataset()
self._tco3_zm = dataset.mean(dim='lon')
def __get_vrm_zm(self, vrm_zm=None, **kwarg):
fnames = glob.glob(tco3_zm['dir'] + "/*.nc")
with xr.open_mfdataset(fnames) as dataset:
dataset = dataset.rename({
tco3_zm['name']: 'tco3_zm',
tco3_zm['coordinades']['time']: 'time',
tco3_zm['coordinades']['lat']: 'lat',
tco3_zm['coordinades']['lon']: 'lon'
})['tco3_zm'].to_dataset()
self._tco3_zm = dataset.mean(dim='lon')
@utils.return_on_failure("Error when loading 'vrm_zm'")
def __get_vrm_zm(self, vrm_zm, **kwarg):
"""Gets and standarises the vrm_zm data"""
if vrm_zm:
fnames = glob.glob(vrm_zm['dir'] + "/*.nc")
with xr.open_mfdataset(fnames) as dataset:
dataset = dataset.rename({
vrm_zm['name']: 'vrm_zm',
vrm_zm['coordinades']['time']: 'time',
vrm_zm['coordinades']['plev']: 'plev',
vrm_zm['coordinades']['lat']: 'lat',
vrm_zm['coordinades']['lon']: 'lon'
})['vrm_zm'].to_dataset()
self._vrm_zm = dataset.mean(dim='lon')
fnames = glob.glob(vrm_zm['dir'] + "/*.nc")
with xr.open_mfdataset(fnames) as dataset:
dataset = dataset.rename({
vrm_zm['name']: 'vrm_zm',
vrm_zm['coordinades']['time']: 'time',
vrm_zm['coordinades']['plev']: 'plev',
vrm_zm['coordinades']['lat']: 'lat',
vrm_zm['coordinades']['lon']: 'lon'
})['vrm_zm'].to_dataset()
self._vrm_zm = dataset.mean(dim='lon')
......@@ -4,6 +4,9 @@ import os
import yaml
import netCDF4
import xarray as xr
import logging
logger = logging.getLogger('o3skim.utils')
@contextmanager
......@@ -12,15 +15,32 @@ def cd(newdir):
prevdir = os.getcwd()
os.chdir(os.path.expanduser(newdir))
try:
logger.debug("Temp dir change to: '%s'", newdir)
yield
finally:
os.chdir(prevdir)
logger.debug("Restore directory: '%s'", prevdir)
def return_on_failure(message):
"""Decorator to do not break but log"""
def decorate(function):
def applicator(*args, **kwargs):
try:
function(*args, **kwargs)
except:
# Log error with stak using root (not utils)
logging.error(message, exc_info=True)
return applicator
return decorate
def load(yaml_file):
"""Loads the .yaml file with the sources configurations"""
with open(yaml_file, "r") as ymlfile:
return yaml.load(ymlfile)
config = yaml.load(ymlfile)
logging.debug("Configuration data: %s", config)
return config
def create_empty_netCDF(fname):
......@@ -34,5 +54,6 @@ def to_netcdf(path, name, dataset):
"""Creates or appends data to named netcdf files"""
years, dsx = zip(*dataset.groupby("time.year"))
fnames = [path + "/" + name + "_%s.nc" % y for y in years]
logging.info("Save dataset into: %s", fnames)
[create_empty_netCDF(fn) for fn in fnames if not os.path.isfile(fn)]
xr.save_mfdataset(dsx, fnames, mode='a')
ECMWF:
ERA-i:
vrm_zm: # Correct variable
name: vmro3
dir: Ecmwf/Erai
coordinades:
time: time
plev: level
lat: latitude
lon: longitude
tco3_zm: # Incorrect variable
name: non_existing_var
dir: Ecmwf/Erai
coordinades:
time: time
lat: latitude
lon: longitude
......@@ -2,6 +2,7 @@
import os
import shutil
import unittest
import xarray as xr
import glob
......@@ -16,8 +17,10 @@ class TestO3SKIM_sources(unittest.TestCase):
def setUp(self):
"""Loads and creates the test folders and files from test_sources.yaml"""
self.config = utils.load("tests/test_sources.yaml")
self.assertTrue(type(self.config) is dict)
self.config_base = utils.load("tests/sources_base.yaml")
self.assertTrue(type(self.config_base) is dict)
self.config_err = utils.load("tests/sources_err.yaml")
self.assertTrue(type(self.config_err) is dict)
self.create_mock_datasets()
self.backup_datasets()
......@@ -28,17 +31,24 @@ class TestO3SKIM_sources(unittest.TestCase):
def create_mock_datasets(self):
"""Creates mock data files according to the loaded configuration"""
for _, collection in self.config.items():
for _, collection in self.config_base.items():
for _, variables in collection.items():
for _, vinfo in variables.items():
path = "data/" + vinfo["dir"]
os.makedirs(path, exist_ok=True)
mockup_data.netcdf(path, **vinfo)
def clean_output(self):
"""Cleans output removing all folders at output"""
with utils.cd("output"):
directories = (d for d in os.listdir() if os.path.isdir(d))
for directory in directories:
shutil.rmtree(directory)
def backup_datasets(self):
"""Loads the mock datasets into an internal variable"""
self.ds_backup = {}
for source, collection in self.config.items():
for source, collection in self.config_base.items():
self.ds_backup[source] = {}
for model, variables in collection.items():
self.ds_backup[source][model] = {}
......@@ -49,7 +59,7 @@ class TestO3SKIM_sources(unittest.TestCase):
def assert_with_backup(self):
"""Asserts the dataset in the backup is equal to the config load"""
for source, collection in self.config.items():
for source, collection in self.config_base.items():
for model, variables in collection.items():
for v, vinfo in variables.items():
paths = "data/" + vinfo["dir"] + "/*.nc"
......@@ -57,11 +67,11 @@ class TestO3SKIM_sources(unittest.TestCase):
xr.testing.assert_identical(
self.ds_backup[source][model][v], ds)
def test_000_SourcesFromConfig(self):
def test_001_SourcesFromConfig(self):
"""Creates the different sources from the configuration file"""
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
name, collection in self.config.items()}
name, collection in self.config_base.items()}
# CCMI-1 tco3_zm asserts
self.assertTrue('time' in ds['CCMI-1']._models['IPSL']._tco3_zm.coords)
......@@ -77,11 +87,11 @@ class TestO3SKIM_sources(unittest.TestCase):
# Checks the original data has not been modified
self.assert_with_backup()
def test_000_OutputFromSources(self):
def test_002_OutputFromSources(self):
"""Skims the data into the output folder"""
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
name, collection in self.config.items()}
name, collection in self.config_base.items()}
with utils.cd("output"):
[source.skim() for source in ds.values()]
......@@ -100,3 +110,23 @@ class TestO3SKIM_sources(unittest.TestCase):
# Checks the original data has not been modified
self.assert_with_backup()
# Removes output data for other tests
self.clean_output()
def test_003_SourceErrorDontBreak(self):
"""The execution does not stop by an error in source"""
with utils.cd("data"):
ds = {name: sources.Source(name, collection) for
name, collection in self.config_err.items()}
with utils.cd("output"):
[source.skim() for source in ds.values()]
# ECMWF data skim asserts
self.assertTrue(os.path.isdir("output/ECMWF_ERA-i"))
self.assertTrue(os.path.exists("output/ECMWF_ERA-i/vrm_zm_2000.nc"))
# Checks the original data has not been modified
self.assert_with_backup()
# Removes output data for other tests
self.clean_output()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment