Commit 64f1231d authored by julian.gethmann's avatar julian.gethmann

Add feature to save and load data to a `CASSANDRA_JSON_DIR` directory

* Iff the environment variable `CASSANDRA_JSON_DIR` is provided, the
  data is saved there or loaded from there if the file already exists there.
* Fixes issue #14
parent 227a4dcf
......@@ -2,13 +2,18 @@
Changelog
=========
* [TODO] Add the raw PV name option to the command line interface
0.8.0 (2018-10-10)
------------------
* Add `CASSANDRA_JSON_DIR` environment variable support. If set this directory is used for saving the JSON files and
reading the files files if they are already in there. This should enable working offline and improve reproducibility.
If it is provided the `directory` argument is ignored. This might break your code!
0.7.3 (2018-06-)
0.7.3 (2018-10-10)
------------------
* Add `fillingpattern` PV
* Add `connected` command line tool
* Add environment variable support for `CASSANDRA_HOST` and `CASSANDRA_PORT`
* Add `CassandraHelper().json_file_to_timeseries` to load previously saved JSON files.
0.7.2 (2018-05-25)
------------------
......@@ -25,7 +30,7 @@ Changelog
0.7.0 (2017-10-24)
------------------
- Renamed the `pandas` module to `pd` to avoid troubles with importing in Python 2 vs. Python 3 and tests.
* Renamed the `pandas` module to `pd` to avoid troubles with importing in Python 2 vs. Python 3 and tests.
0.6.0 (2017-10-11)
------------------
......
......@@ -141,11 +141,17 @@ Environment variables
~~~~~~~~~~~~~~~~~~~~~
Let us assume you tunnelled to a host in the machine network and now you do not have to use the archive gateway, but localhost as the host.
This can easily be accomplished by setting the environment variable ``CASSANDRA_HOST`` and then running your script.
code::
This can easily be accomplished by setting the environment variable ``CASSANDRA_HOST`` and then running your script. ::
$ CASSANDRA_HOST=localhost python script.py
.. note::
Unfortunately it is not sufficient to set the environment variable from inside your script or e. g. Jupyter notebook.
If you want to backup the data and later use it again you can use the ``CASSANDRA_JSON_DIR`` environment variable which
needs to be a path to an non existing (existing) directory which then will be created and the JSON files will be saved
to (loaded from). ::
$ CASSANDA_JSON_DIR=data python script.py
In this case the `directory` option of the :class:`cassandra.cassandra.Cassandra` is ignored and replaced by ``CASSANDRA_JSON_DIR``.
......@@ -23,7 +23,7 @@ install_requires =
matplotlib
numpy
paramiko
tests_require = pytest; pytest-cov; pandas; tox
tests_require = pytest; pytest-cov; pytest-datadir; pandas; tox; packaging; mypy
[options.packages.find]
where = src
......@@ -86,6 +86,11 @@ exclude =
docs/conf.py
max-line-length = 100
[mypy]
check-untyped-defs = False
disallow-untyped-calls = False
ignore-missing-imports = True
[pyscaffold]
# PyScaffold's parameters when the project was created.
# This will be used when updating. Do not change!
......
......@@ -21,6 +21,7 @@
* CASSANDRA_HOST
* CASSANDRA_PORT
* CASSANDRA_JSON_DIR
"""
from __future__ import with_statement
......@@ -28,18 +29,16 @@ import datetime
import json
import logging
import os
import pathlib
from collections import namedtuple
from os import path
from sys import version_info
from typing import Dict # flake8: noqa
from typing import List # flake8: noqa
from typing import NamedTuple # flake8: noqa
from typing import Optional # flake8: noqa
from typing import Tuple # flake8: noqa
from typing import Union # flake8: noqa
from cassandra import __version__
__author__ = "Julian Gethmann"
__copyright__ = "Julian Gethmann"
__license__ = "mit"
......@@ -50,6 +49,14 @@ try: # Py3
except ImportError: # Py2
from urllib2 import URLError
JSON_Data = List[Optional[Dict[str, Union[int, Dict[str, str]]]]]
# DataSet = NamedTuple("dataset", [List[datetime.datetime], List[Union[List[float], float]]])
DataSet = namedtuple("dataset", ["timestamps", "values"])
# class DataSet(NamedTuple):
# datestamps: datetime.datetime
# values: float
class Pvs(object):
"""Provide shortcuts for some PV names.
......@@ -268,11 +275,11 @@ class CassandraHelper(object):
else:
return ret
JSONData = List[Optional[Dict[str, Union[int, Dict[str, str]]]]]
@staticmethod
def cassandra_json_to_timeseries(json_data):
# type: (JSONData) -> NamedTuple("dataset", [List[datetime.datetime], List[Union[List[float], float]]])
def cassandra_json_to_timeseries(
json_data, # type: JSON_Data
):
# type: (...) -> DataSet
"""Return lists of timestamps and values for given Cassandra's JSON data.
Args:
......@@ -287,15 +294,15 @@ class CassandraHelper(object):
Examples:
>>> CassandraHelper().cassandra_json_to_timeseries([])
dataset(timestamps=[], values=[])
DataSet(timestamps=[], values=[])
>>> CassandraHelper().cassandra_json_to_timeseries([{"value": [1.3e-07],
... "time": 1454698800000000000,
... "severity": {"level": "OK"}}])
dataset(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[1.3e-07])
DataSet(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[1.3e-07])
>>> CassandraHelper().cassandra_json_to_timeseries([{"value": [1.3e-07, 1.4e-07],
... "time": 1454698800000000000,
... "severity": {"level": "OK"}}])
dataset(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[[1.3e-07, 1.4e-07]])
DataSet(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[[1.3e-07, 1.4e-07]])
.. note:: This function doesn't check for sanity of the data, yet.
.. warning:: Returned values may be empty ones!
......@@ -314,8 +321,28 @@ class CassandraHelper(object):
if entry["severity"]["level"] == "OK"
]
dataset = namedtuple("dataset", ["timestamps", "values"])
return dataset(timestamps, values)
return DataSet(timestamps, values)
@staticmethod
def json_file_to_timeseries(filename):
# type: (Union[str, pathlib.Path]) -> DataSet
"""Return lists of timestamps and values for given Cassandra's JSON file.
Args:
filename (str|patlib.Path): file name of a JSON list with Cassandra entries.
Returns:
namedtuple:
The namedtuple `dataset` consists of a list of datetime objects called `timestamps` and
a list of the corresponding values either as floats or
wrapped inside a list if there are more than one value
to be returned. The latter list is called `values`.
.. seealso:: `cassandra_json_to_timeseries`
"""
with open(filename, "r") as fobj:
json_data = json.load(fobj)
return CassandraHelper().cassandra_json_to_timeseries(json_data)
class Cassandra(object):
......@@ -339,6 +366,14 @@ class Cassandra(object):
If the environment variables `CASSANDRA_HOST` is set that will be used as
the HOSTNAME. Similar for `CASSANDRA_PORT`.
.. versionchanged:: 0.8.0
If the environment variable `CASSANDRA_JSON_DIR` and the directory does not exist, the data
will be downloaded to this directory. If it exists the JSON files from within this directory
will be used instead of downloading them from the database.
If not all data is available in this directory, it will be downloaded silently.
The json file format changed.
Raises:
URLError: in case the connection times out (e.g. if one has not
connected to the IBPT-LAN)
......@@ -353,7 +388,7 @@ class Cassandra(object):
dataset(timestamps=[datetime.datetime(2016, 4, 13, 10, 59, 59, 250177), datetime.datetime(2016, 4, 13, 11, 0, 0, 250285), datetime.datetime(2016, 4, 13, 11, 0, 1, 250484)], values=[0.9991628617721655, 0.9992209719948126, 0.9991912895254449])
"""
HOSTNAME = str(os.environ.get("CASSANDRA_HOST", "ankasr-archiver.anka.kit.edu"))
PORT = int(os.environ.get("CASSANDRA_PORT", 9812))
PORT = int(os.environ.get("CASSANDRA_PORT", "9812"))
logging.debug("Hostname: {}:{}".format(HOSTNAME, PORT))
RETRIES = 10
......@@ -375,36 +410,51 @@ class Cassandra(object):
count (int): somehow correlated to the number of data points fetched,
if not provided the raw data will be provided. Be careful, since
it very much data to load.
directory (str): directory to which the JSON data is written/from where it is read. Defaults to "."
directory (str): directory to which the JSON data is written/from where it is read.
Defaults to ".". If the environment variable `CASSANDRA_JSON_DIR` is set, that will
be used instead.
"""
self.start_time = CassandraHelper().cassandra_time2dt(start)
self.end_time = CassandraHelper().cassandra_time2dt(end)
self.pv = pv
self.count = count
self.directory = directory
self.directory = os.environ.get("CASSANDRA_JSON_DIR", directory)
if "CASSANDRA_JSON_DIR" in os.environ.keys():
self._backup = True
pathlib.Path(self.directory).absolute().mkdir(exist_ok=True, parents=True)
else:
self._backup = False
self.json_filename = "{pv}_{start}_{end}_{count}.json".format(
pv=self.pv,
start=self.start_time.strftime("%T"),
end=self.end_time.strftime("%T"),
start=self.start_time.strftime("%Y-%m-%dT%T%z"),
end=self.end_time.strftime("%Y-%m-%dT%T%z"),
count=self.count,
)
self.json_file = path.join(directory, self.json_filename)
self.json_file = path.join(self.directory, self.json_filename)
self.timeout = None # type: Optional[int]
def __enter__(self):
# type: (Cassandra) -> str
""" ..versionadded:: 0.3 """
import tempfile
self.directory = tempfile.mkdtemp()
self.json_file = path.join(self.directory, self.json_filename)
# type: (Cassandra) -> Tuple[List[datetime.datetime], List[Union[List[float], float]]]
""" ..versionadded:: 0.3
.. versionchanged:: 0.8
If the environment variable `CASSANDRA_JSON_DIR` is set, that directory will be used.
"""
if not self._backup:
import tempfile
self.directory = tempfile.mkdtemp()
self.json_file = path.join(self.directory, self.json_filename)
return self.get_json_local()
def __exit__(self, ctx_type, ctx_value, ctx_traceback):
# type: (Cassandra, Optional[Exception], Optional[str], Optional[TracebackType]) -> None
""" ..versionadded:: 0.3 """
from shutil import rmtree
rmtree(self.directory)
if not self._backup:
from shutil import rmtree
rmtree(self.directory)
@staticmethod
def check_connection():
......
......@@ -40,7 +40,7 @@ def _get_mean_values(
# pass
output_str = {
"full": "{name} = {val} \pm {std}",
"full": "{name} = {val} \\pm {std}",
"raw": "{val}",
"str": "{val}",
}
......
mypy
# Add requirements only needed for your unittests and during development here.
# They will be installed automatically when running `python setup.py test`.
# ATTENTION: Don't remove pytest-cov and pytest as they are needed.
packaging
pandas
pytest
pytest-cov
pytest-datadir
sphinx
tox
......@@ -8,11 +8,15 @@
import datetime
import json
import os
import pathlib
import shutil
from contextlib import suppress
from importlib import reload
import cassandra
import pytest
from cassandra.cassandra import Cassandra
from cassandra.cassandra import Cassandra, CassandraHelper
from packaging import version
from .conftest import request_openurl
......@@ -26,6 +30,30 @@ except ImportError: # Py2
import urllib2 as request
@pytest.mark.usefixtures("cleandir", "setup_")
class TestCassandraHelper(object):
@pytest.fixture()
def json_file(self, cleandir):
to = pathlib.Path(cleandir) / "PEAKTUNE2.json"
shutil.copyfile((pathlib.Path(__file__).parent /
"data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json"), to)
return to.absolute()
def test_json_file_to_timeseries(self, json_file):
res = CassandraHelper().json_file_to_timeseries(json_file)
assert isinstance(res, tuple)
data = json.loads(
(pathlib.Path(__file__).parent /
"data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json").read_text())
assert res[0] == [datetime.datetime.fromtimestamp(d["time"] / 1e9) for d in data]
assert res[1] == [d["value"][0] for d in data]
assert res.timestamps == [datetime.datetime.fromtimestamp(d["time"] / 1e9) for d in data]
assert res.values == [d["value"][0] for d in data]
with pytest.raises(FileNotFoundError):
CassandraHelper().json_file_to_timeseries("/dev/false")
@pytest.mark.usefixtures("cleandir", "setup_")
class TestCassandraClass(object):
......@@ -41,7 +69,12 @@ class TestCassandraClass(object):
assert os.path.abspath(cas2.directory) == os.path.abspath(os.path.curdir)
# assert pathlib.Path(cas2.directory).absolute() == pathlib.Path.cwd()
assert cas2.json_filename == "A:SR:BeamInfo:01:Energy_10:10:10_10:10:30_None.json"
if version.parse(cassandra.__version__) <= version.parse("0.7.2"):
assert cas2.json_filename == "A:SR:BeamInfo:01:Energy_10:10:10_10:10:30_None.json"
else:
assert cas2.json_filename == \
"A:SR:BeamInfo:01:Energy_2017-11-02T10:10:10_2017-11-02T10:10:30" \
"_None.json"
def test_gen_url(self, cas, setup_):
url = cas.gen_url()
......@@ -101,6 +134,39 @@ class TestCassandraClass(object):
assert cas.HOSTNAME == "test"
assert cas.PORT == 1234
@pytest.fixture(scope="function")
def env_backup(self, monkeypatch):
with monkeypatch.context() as m:
m.setenv("CASSANDRA_JSON_DIR", "backup_test")
reload(cassandra.cassandra)
yield
reload(cassandra.cassandra)
with suppress(FileNotFoundError):
shutil.rmtree("backup_test")
def test_backup(self, setup_, env_backup, monkeypatch):
start, end, pv = setup_
def off(url, timeout):
raise URLError(
"Request had a timeout. Maybe you're not inside the IBPT-CN-LAN or provided a wrong PV name"
)
assert not pathlib.Path("backup_test").is_dir()
with cassandra.cassandra.Cassandra(start, end, pv) as cas:
pass
assert pathlib.Path("backup_test").is_dir()
assert pathlib.Path("backup_test/A:SR:BeamInfo:01:Energy_2017-11-02T10:10:10_2017-11"
"-02T10:10:30_None.json").is_file(), "Backup file does not exist (1)"
monkeypatch.setattr(request, "urlopen", off)
with cassandra.cassandra.Cassandra(start, end, pv) as cas2:
pass
assert pathlib.Path("backup_test").is_dir()
assert pathlib.Path("backup_test/A:SR:BeamInfo:01:Energy_2017-11-02T10:10:10_2017-11"
"-02T10:10:30_None.json").is_file(), "Backup file does not exist (2)"
assert cas[0] == cas2[0]
assert cas[1] == cas2[1]
def test_download_cassandra_data_online(self, cas, monkeypatch):
with open(
os.path.dirname(os.path.abspath(__file__)) +
......@@ -122,16 +188,16 @@ class TestCassandraClass(object):
with pytest.raises(URLError, match=r".*[Tt]imeout.*"):
cas._download_cassandra_data()
def test_dump_cassandra_data(self, cas, cleandir, monkeypatch):
def test_dump_cassandra_data(self, cas, cleandir, monkeypatch, shared_datadir):
assert not os.path.isfile(os.path.join(cleandir, cas.json_filename))
# assert not pathlib.Path(cleandir / cas.json_filename).is_file()
assert not pathlib.Path(cleandir / cas.json_filename).is_file()
monkeypatch.setattr(request, "urlopen", request_openurl)
assert str(cleandir / cas.json_filename) == cas.dump_cassandra_data()
assert os.path.isfile(os.path.join(cleandir, cas.json_filename))
# assert pathlib.Path(cleandir / cas.json_filename).is_file()
assert pathlib.Path(cleandir / cas.json_filename).is_file()
with open(str(cleandir / cas.json_filename), "r") as got, open(
os.path.dirname(os.path.abspath(__file__)) +
"/data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json", "r") as expected:
shared_datadir / "A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json",
"r") as expected:
assert json.load(expected) == json.load(got)
def test_with(self, setup_dl, monkeypatch):
......
......@@ -8,7 +8,6 @@ Unit tests for cassandra.pd
import datetime
import doctest
import json
import os
import sys
import unittest
from typing import List, NamedTuple
......@@ -58,7 +57,7 @@ class TestCassandraPD(object):
plane=plane)).open("rb")
@pytest.mark.skipif(sys.version_info < (3, 4), reason="requires python3.4")
def test_pvs2pd(self, monkeypatch, setup_pd, cleandir):
def test_pvs2pd(self, monkeypatch, setup_pd, cleandir, shared_datadir):
monkeypatch.setattr(request, "urlopen", self.urlopen_nus)
ret = pvs2pd(*setup_pd)
assert [0.81171229137420042, 0.72489568710029628] == list(ret.mean())
......@@ -69,16 +68,12 @@ class TestCassandraPD(object):
with pytest.raises(ValueError):
pvs2pd(*setup_pd, upsample=True)
fn = "A:SR:BBB:01:X:SRAM:PEAKTUNE2_18:28:36_18:28:51_None.json"
fn_in = "A:SR:BBB:01:X:SRAM:PEAKTUNE2_18:28:36_18:28:51_None.json"
fn = "A:SR:BBB:01:X:SRAM:PEAKTUNE2_2016-08-05T18:28:36_2016-08-05T18:28:51_None.json"
assert not pathlib.Path(cleandir / fn).is_file()
pvs2pd(*setup_pd, save_local=True)
with open(str(cleandir / fn), "r") as got, open(
"{abspath}/data/{fn}".format(
abspath=os.path.dirname(os.path.abspath(__file__)),
fn=fn,
),
"r",
) as expected:
with open(str(cleandir / fn), "r") as got, \
open((shared_datadir / fn_in).absolute(), "r", ) as expected:
assert json.load(expected) == json.load(got)
......
......@@ -10,8 +10,10 @@ commands =
deps =
pytest
pytest-cov
pytest-datadir
-r{toxinidir}/requirements.txt
pandas
packaging
[testenv:flake8]
changedir = {toxinidir}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment