Commit 46612d9c authored by BorjaEst's avatar BorjaEst
Browse files

Remove source class

parent 276e0ba5
#!/usr/bin/env python3
"""
o3skim is a tool for data pre-processing of ozone applications
o3skim is a tool for data pre-processing of ozone applications:
- lon_mean: Mean operation over longitude axis.
- lat_mean: Mean operation over latitude axis.
"""
import sys
import argparse
import logging
import os
import warnings
import o3skim
from o3skim import utils
warnings.simplefilter(action='ignore', category=FutureWarning)
def cmdline_args():
p = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
# Arguments
p.add_argument("-f", "--sources_file", type=str, default="./sources.yaml",
help="custom sources YAML configuration (default: %(default)s)")
p.add_argument("-s", "--split_by", type=str, default=None,
choices=['year', 'decade'],
help="Period time to split output (default: %(default)s)")
p.add_argument("-v", "--verbosity", type=str, default='ERROR',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help="Sets the logging level (default: %(default)s)")
p = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__)
p.add_argument(
"-f", "--sources_file",
type=str, default="./sources.yaml",
help="custom sources YAML configuration (default: %(default)s)")
p.add_argument(
"-s", "--split_by",
type=str, default=None,
choices=['year', 'decade'],
help="Period time to split output (default: %(default)s)")
p.add_argument(
"-v", "--verbosity",
type=str, default='ERROR',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help="Sets the logging level (default: %(default)s)")
p.add_argument(
"operations", nargs='+',
type=str,
choices=['lon_mean', 'lat_mean'],
help="o3skim operations to perform")
return(p.parse_args())
@utils.return_on_failure("Error processing model")
def skim_process(name, operations, split_by, configuration):
# Process start
logging.info("Process start for model %s", name)
# Data loading
logging.info("Loading model %s data", name)
with utils.cd('data'):
model, metadata = o3skim.loading(**configuration)
# Data processing
logging.info("Processing model %s data", name)
processed = o3skim.processing(model, operations)
# Data grouping
logging.info("Grouping processed model %s data", name)
years, datasets = o3skim.group(processed, split_by)
# Create output directory saving
logging.info("Creating output directory %s", name)
with utils.cd('output'):
os.makedirs(name, exist_ok=True)
# Data saving
logging.info("Saving processed model %s data", name)
with utils.cd('output/' + name):
o3skim.saving(datasets, split_by, years)
# Metadata saving
logging.debug("Creating model %s metadata.yaml file", name)
with utils.cd('output/' + name):
utils.save("metadata.yaml", metadata)
# Process end
logging.info("Process end for model %s", name)
if __name__ == '__main__':
args = cmdline_args()
# Set logging level
logformat = '%(asctime)s %(name)-24s %(levelname)-8s %(message)s'
logging.basicConfig(level=getattr(logging, args.verbosity), format=logformat)
logging.basicConfig(level=getattr(
logging, args.verbosity), format=logformat)
# Configuration load
logging.info("Lookinf for config at: '%s'", args.sources_file)
config = o3skim.load(args.sources_file)
# Create sources
logging.info("Loading data from './data' ")
with o3skim.cd("data"):
ds = {name: o3skim.Source(name, **collection) for
name, collection in config.items()}
logging.info("Looking for config at: '%s'", args.sources_file)
config = utils.load(args.sources_file)
# Skim output
logging.info("Skimming data to './output' ")
with o3skim.cd("output"):
[source.skim(groupby=args.split_by) for source in ds.values()]
# Processing loop
logging.info("Processing loop for models")
for model_name in config:
skim_process(
name=model_name,
operations=args.operations,
split_by=args.split_by,
configuration=config[model_name])
# End of program
logging.info("End of program")
\ No newline at end of file
logging.info("End of program")
"""
O3as package with classes and utilities to handle ozone data skimming.
The main usage of the package resides on the generation of "Source"
class instances from the loading of netCDF files though collection
descriptions generated by source files.
However, as the generation of collection descriptions in a form of dict
type might be difficult, a yaml file can be converted into the required
variable by using the util "load" with the path to the 'sources.yaml'
file as input.
Once the desired collection is formated in the shape of dict variable,
a source object can be created from using the class "Source". During
this step the model data is loaded into the source instance involving
on the background process the data sorting and standardization. Note
that those models with errors on the collection specification are not
loaded neither interrupt the loading process. However, notice of those
event is logged on strerr together with the stack execution info.
After the data has been loaded the object intstance correctly created,
it is possible to use the internal methods to generate a reduced output
on the current folder.
To simplify the management of the data and output directories, the
package offest a "cd" utility which can be used together with a "with"
statement to change temporary the location of the execution folder.
"""
import logging
import xarray as xr
from o3skim import extended_xarray
from o3skim import source
from o3skim import standardization
from o3skim import loads
from o3skim import operations
from o3skim import utils
from o3skim import extended_xarray
logger = logging.getLogger('o3skim')
def loading(tco3_zm=None, vmro3_zm=None, metadata={}):
logger.debug("Loading ozone variables data")
dataset = xr.Dataset()
def raise_conflict(d1, d2): raise Exception(
"Conflict merging {}, {}".format(d1, d2))
if tco3_zm:
datarray, ds_attrs, meta = loads.tco3(**tco3_zm)
dataset['tco3_zm'] = datarray
utils.mergedicts(dataset.attrs, ds_attrs, raise_conflict)
metadata['tco3_zm'] = meta
if vmro3_zm:
datarray, ds_attrs, meta = loads.vmro3(**vmro3_zm)
dataset['vmro3_zm'] = datarray
utils.mergedicts(dataset.attrs, ds_attrs, raise_conflict)
metadata['vmro3_zm'] = meta
return dataset, metadata
def processing(dataset, actions):
logger.debug("Processing queue: %s", actions)
actions = actions.copy() # Do not edit original
operation = actions.pop()
processed = operations.run(operation, dataset)
if actions != []:
processed = processing(processed, actions)
return processed
def group(dataset, split_by):
logger.debug("Splitting dataset by %s", split_by)
if not split_by:
return [None], [dataset]
elif split_by == 'year':
def delta_map(x): return x.year
years = dataset.indexes['time'].map(delta_map)
groups = dataset.groupby(xr.DataArray(years))
elif split_by == 'decade':
def delta_map(x): return x.year // 10 * 10
years = dataset.indexes['time'].map(delta_map)
groups = dataset.groupby(xr.DataArray(years))
else:
message = "Bad input split_by {} use {None, 'year', 'decade'}"
raise KeyError(message.format(split_by))
return tuple(zip(*groups))
Source = source.Source
load = utils.load
cd = utils.cd
def saving(datasets, split_by=None, years=None):
if not split_by:
def path(v, _): return "{}.nc".format(v)
elif split_by == 'year':
def path(v, y): return "{}_{}-{}.nc".format(v, y, y + 1)
elif split_by == 'decade':
def path(v, y): return "{}_{}-{}.nc".format(v, y, y + 10)
else:
message = "Bad input split_by {} use {None, 'year', 'decade'}"
raise KeyError(message.format(split_by))
for variable in datasets[0].var():
xr.save_mfdataset(
datasets=[ds.o3[variable].dataset for ds in datasets],
paths=[path(variable, year) for year in years])
"""
xarray extension module to provide model class and functions to handle
tco3 and vmro3 operations and data skimming.
xarray extension module to provide model class and functions to
handle tco3 and vmro3 operations and data skimming.
"""
import logging
import os
import unittest
import xarray as xr
import pandas as pd
import numpy as np
from o3skim import standardization, utils
logger = logging.getLogger('extended_xr')
mean_coord = 'lon'
@xr.register_dataset_accessor("model")
class ModelAccessor:
@xr.register_dataset_accessor("o3")
class O3Accessor:
def __init__(self, xarray_obj):
self._model = xarray_obj
self._metadata = {}
@property
def tco3(self):
"""Return the total ozone column of this dataset."""
if "tco3_zm" in list(self._model.var()):
dataset = self._model["tco3_zm"].to_dataset()
dataset.attrs = self._model.attrs
return dataset
else:
return None
def __getitem__(self, o3variable):
if o3variable == 'tco3_zm':
return self._model.tco3
if o3variable == 'vmro3_zm':
return self._model.vmro3
@property
def vmro3(self):
"""Return the ozone volume mixing ratio of this dataset."""
if "vmro3_zm" in list(self._model.var()):
dataset = self._model["vmro3_zm"].to_dataset()
dataset.attrs = self._model.attrs
return dataset
else:
return None
@property
def metadata(self):
"""Returns the metadata property"""
return self._metadata
class O3Variable:
def __init__(self, xarray_obj, name):
self._model = xarray_obj
self._name = name
def add_metadata(self, metadata):
"""Merges the input metadata with the model metadata"""
utils.mergedicts(self._metadata, metadata)
@property
def dataset(self):
dataset = self._model[self._name].to_dataset()
dataset.attrs = self._model.attrs
return dataset
def set_metadata(self, metadata):
"""Sets the metadata to the input variable."""
self._metadata = metadata
def groupby_year(self):
"""Returns a grouped dataset by year"""
logger.debug("Performing group by year on model")
def delta_map(x): return x.year
years = self._model.indexes['time'].map(delta_map)
return self._model.groupby(xr.DataArray(years))
@xr.register_dataset_accessor("tco3")
class TCO3Accessor(O3Variable):
def __init__(self, xarray_obj):
O3Variable.__init__(self, xarray_obj, name='tco3_zm')
def groupby_decade(self):
"""Returns a grouped dataset by decade"""
logger.debug("Performing group by decade on model")
def delta_map(x): return x.year // 10 * 10
years = self._model.indexes['time'].map(delta_map)
return self._model.groupby(xr.DataArray(years))
def skim(self):
"""Skims model producing reduced dataset"""
logger.debug("Skimming model")
skimmed = self._model.mean(mean_coord)
skimmed.attrs = self._model.attrs
for var in self._model:
skimmed[var].attrs = self._model[var].attrs
return skimmed
@xr.register_dataset_accessor("vmro3")
class VMO3Accessor(O3Variable):
def __init__(self, xarray_obj):
O3Variable.__init__(self, xarray_obj, name='vmro3_zm')
"""
Module in charge of data loading.
"""
import logging
from o3skim import standardization
from o3skim import utils
import xarray as xr
logger = logging.getLogger('load')
def tco3(name, paths, coordinates, metadata={}):
logger.debug("Loading tco3 data from: %s", paths)
with xr.open_mfdataset(paths) as dataset:
datarray = standardization.tco3(
array=dataset[name],
coord=coordinates)
ds_attrs = dataset.attrs
return datarray, ds_attrs, metadata
def vmro3(name, paths, coordinates, metadata={}):
logger.debug("Loading vmro3 data from: %s", paths)
with xr.open_mfdataset(paths) as dataset:
datarray = standardization.vmro3(
array=dataset[name],
coord=coordinates)
ds_attrs = dataset.attrs
return datarray, ds_attrs, metadata
"""
Module in charge of operations implementation.
"""
import logging
logger = logging.getLogger('operations')
def run(name, dataset):
if name == 'lon_mean':
return lon_mean(dataset)
elif name == 'lat_mean':
return lat_mean(dataset)
else:
message = "Bad selected operation: {}"
raise KeyError(message.format(name))
def lon_mean(dataset):
logger.debug("Calculating mean over model longitude")
skimmed = dataset.mean('lon')
skimmed.attrs = dataset.attrs
for var in dataset.var():
skimmed[var].attrs = dataset[var].attrs
return skimmed
def lat_mean(dataset):
logger.debug("Calculating mean over model latitude")
skimmed = dataset.mean('lat')
skimmed.attrs = dataset.attrs
for var in dataset.var():
skimmed[var].attrs = dataset[var].attrs
return skimmed
"""
Module in charge of Source class implementation.
Sources are responsible of loading netCDF collections from data and
do the standardization during the process. Each source is compose
therefore from 0 to N models which can be accessed as subscriptable
object by it's model name.
It also implement internal methods which can be used to operate the
model data. For example using the method "skim" generates a reduced
version of the models data on the current folder.
"""
import logging
import os
import unittest
import pandas as pd
import numpy as np
import xarray as xr
from o3skim import extended_xarray
from o3skim import standardization
from o3skim import utils
logger = logging.getLogger('source')
class Source:
r"""Conceptual class for a data source. It is produced by the
loading and standardization of multiple data models.
The current supported model variables are "tco3_zm" and "vmro3_zm",
which should contain the information on how to retrieve the data
from the netCDF collection.
:param name: Name to provide to the source.
:type name: str
:param metadata: Source metadata, defaults to {}.
:type metadata: dict, optional
:param \**collections: kwarg where each 'key' is the model
name and its 'value' another dictionary with the variable
loading statements for that model.
{name:str, paths: str, coordinates: dict, metadata: dict}
"""
def __init__(self, name, metadata={}, **collections):
self._name = name
self._metadata = metadata
self._models = {}
logger.info("Loading source '%s'", name)
for name, specifications in collections.items():
logger.info("Loading model '%s'", name)
model = _load_model(**specifications)
if model:
self._models[name] = model
def __getitem__(self, model_name):
return self._models[model_name]
@property
def name(self):
return self._name
@property
def models(self):
return list(self._models.keys())
@property
def metadata(self):
return self._metadata
def skim(self, groupby=None):
"""Request to skim all source data into the current folder.
The output is generated into multiple folder where
each model output is generated in a forder with the source
name defined at the source initialization followed by
'_' and the model name: "<source_name>_<model_name>".
If there was metadata added when creating the source, it is
delivered into a "metadata.yaml" file on the directory.
:param groupby: How to group output (None, 'year', 'decade').
:type groupby: str, optional
"""
for model in self._models:
dirname = "{}_{}".format(self._name, model)
os.makedirs(dirname, exist_ok=True)
logger.info("Skimming data from '%s'", dirname)
with utils.cd(dirname):
metadata = {} # copy() does not recurse inside dict
utils.mergedicts(metadata, self.metadata)
utils.mergedicts(metadata, self[model].model.metadata)
_skim(self[model], delta=groupby, metadata=metadata)
@utils.return_on_failure("Error when loading model", default=None)
def _load_model(tco3_zm=None, vmro3_zm=None, metadata={}):
"""Loads a model merging standardized data from specified datasets.
:param tco3_zm: tco3 variable description, defaults to None.
:type tco3_zm: {name:str, paths:str,
coordinates:{lat:str, lon:str, time:str}},
optional
:param vmro3_zm: vmro3 variable description, defaults to None.
:type vmro3_zm: {name:str, paths:str,
coordinates:{lat:str, lon:str, plev:str time:str}},
optional
:param metadata: Source metadata, defaults to {}.
:type metadata: dict, optional
:return: Dataset with specified variables.
:rtype: xarray.Dataset
"""
dataset = xr.Dataset()
dataset.model.set_metadata(metadata)
def raise_conflict(d1, d2): raise Exception(
"Conflict merging {}, {}".format(d1, d2))
if tco3_zm:
logger.debug("Loading tco3_zm into model")
with xr.open_mfdataset(tco3_zm['paths']) as load:
dataset['tco3_zm'] = standardization.standardize_tco3(
array=load[tco3_zm['name']],
coordinates=tco3_zm['coordinates'])
metadata = {'tco3_zm': tco3_zm.get('metadata', {})}
dataset.model.add_metadata(metadata)
utils.mergedicts(dataset.attrs, load.attrs, raise_conflict)
if vmro3_zm:
logger.debug("Loading vmro3_zm into model")
with xr.open_mfdataset(vmro3_zm['paths']) as load:
dataset['vmro3_zm'] = standardization.standardize_vmro3(
array=load[vmro3_zm['name']],
coordinates=vmro3_zm['coordinates'])
metadata = {'vmro3_zm': vmro3_zm.get('metadata', {})}
dataset.model.add_metadata(metadata)
utils.mergedicts(dataset.attrs, load.attrs, raise_conflict)
return dataset
def _skim(model, delta=None, metadata=None):
"""Skims model producing reduced dataset files. It is possible to
indicate the time to split the output by 'delta'. If metadata is
introduced in the form of dict, a 'metadata.yaml' file is
generated together with the skimmed output.
:param model: Dataset with ModelAccessor to skim.
:type model: xarray.Dataset
:param metadata: Model metadata, to save as yaml defaults to None.
:type metadata: dict, optional
:param delta: How to group output (None, 'year', 'decade').
:type delta:str, optional
"""
logger.debug("Skimming model with delta {}".format(delta))
skimmed = model.model.skim()
if delta == 'year':
def tco3_path(y): return "tco3_zm_{}-{}.nc".format(y, y + 1)
def vmro3_path(y): return "vmro3_zm_{}-{}.nc".format(y, y + 1)
groups = skimmed.model.groupby_year()
elif delta == 'decade':
def tco3_path(y): return "tco3_zm_{}-{}.nc".format(y, y + 10)
def vmro3_path(y): return "vmro3_zm_{}-{}.nc".format(y, y + 10)
groups = skimmed.model.groupby_decade()
else:
def tco3_path(_): return "tco3_zm.nc"
def vmro3_path(_): return "vmro3_zm.nc"
groups = [(None, skimmed), ]
years, datasets = zip(*groups)
if skimmed.model.tco3:
logger.debug("Saving skimed tco3 into files")
xr.save_mfdataset(
datasets=[ds.model.tco3 for ds in datasets],
paths=[tco3_path(year) for year in years]
)
if skimmed.model.vmro3:
logger.debug("Saving skimed vmro3 into files")
xr.save_mfdataset(
datasets=[ds.model.vmro3 for ds in datasets],
paths=[vmro3_path(year) for year in years]
)
if metadata:
logger.debug("Creating metadata.yaml file")
utils.save(file_name="metadata.yaml", metadata=metadata)
"""Module in charge of dataset standardization when loading models."""
import logging
import unittest
import xarray as xr
import pandas as pd
import numpy as np
from o3skim import utils
logger = logging.getLogger('o3skim.standardization')
@utils.return_on_failure("Error when loading '{0}'".format('tco3_zm'),
default=xr.Dataset())
def standardize_tco3(array, coordinates):
def tco3(array, coord):