Commit 1d133bda authored by julian.gethmann's avatar julian.gethmann

Merge branch 'dev-iss14' into 'master'

Merge dev-iss14

Closes #14

See merge request las-software/15-3-DataProcessing/cassandra!1
parents d8cddb7e eab182cc
......@@ -40,7 +40,7 @@ before_script:
image: "python:3.6"
- python test
- TZ='Europe/Berlin' python test
......@@ -2,13 +2,18 @@
* [TODO] Add the raw PV name option to the command line interface
0.8.0 (2018-10-10)
* Add `CASSANDRA_JSON_DIR` environment variable support. If set this directory is used for saving the JSON files and
reading the files files if they are already in there. This should enable working offline and improve reproducibility.
If it is provided the `directory` argument is ignored. This might break your code!
0.7.3 (2018-06-)
0.7.3 (2018-10-10)
* Add `fillingpattern` PV
* Add `connected` command line tool
* Add environment variable support for `CASSANDRA_HOST` and `CASSANDRA_PORT`
* Add `CassandraHelper().json_file_to_timeseries` to load previously saved JSON files.
0.7.2 (2018-05-25)
......@@ -25,7 +30,7 @@ Changelog
0.7.0 (2017-10-24)
- Renamed the `pandas` module to `pd` to avoid troubles with importing in Python 2 vs. Python 3 and tests.
* Renamed the `pandas` module to `pd` to avoid troubles with importing in Python 2 vs. Python 3 and tests.
0.6.0 (2017-10-11)
......@@ -4,6 +4,16 @@ Background information and general remark
Cassandra_ is the current (2018) archiving solution used at KARA. There is a `JavaScript-Object-Notation`_-API available, that can be accessed from within the IBPT-LAN (CN, VPN or
Usage with different hosts or ports
In case you want to run this scripts or library from a place where the host or port does not fit the presets, you need to provide them as environment variables.
For example if you want to run your script ```` and you forwarded ``ankacomm`` via SSH to the port 9999,
you can then call your script in bash like::
CASSANDRA_HOST=localhost; CASSANDRA_PORT=9999 python3
......@@ -136,3 +136,22 @@ current directory.
.. _pandas:
.. _`offset-aliases`:
Environment variables
Let us assume you tunnelled to a host in the machine network and now you do not have to use the archive gateway, but localhost as the host.
This can easily be accomplished by setting the environment variable ``CASSANDRA_HOST`` and then running your script. ::
$ CASSANDRA_HOST=localhost python
.. note::
Unfortunately it is not sufficient to set the environment variable from inside your script or e. g. Jupyter notebook.
If you want to backup the data and later use it again you can use the ``CASSANDRA_JSON_DIR`` environment variable which
needs to be a path to an non existing (existing) directory which then will be created and the JSON files will be saved
to (loaded from). ::
$ CASSANDA_JSON_DIR=data python
In this case the `directory` option of the :class:`cassandra.cassandra.Cassandra` is ignored and replaced by ``CASSANDRA_JSON_DIR``.
......@@ -16,6 +16,7 @@ If I write Cassandra I am speaking of this Python class, if I write cassanda I a
Examples <examples>
(Command line) Scripts <scripts>
Module Reference <api/cassandra>
Error messages and trouble shooting <troubleshooting>
Changelog <changelog>
License <license>
Authors <authors>
.. _troubleshooting:
Error messages
Typical causes for the URLError with the message `Maybe you're not inside the ANKA-LAN` are
* You are not inside the IBPT office network nor in it via VPN. You should check this by getting the following URL. Try to do it with wget or curl, because maybe you are working on a different computer than you think ;)
* One of your provided ``PV`` names is wrong
* Your start time is later that your end time
* The cassandra archive is not responding correctly. Did you really double checked the other causes? Then get in contact with Sebastian Marsching or write a JIRA ticket.
......@@ -23,7 +23,7 @@ install_requires =
tests_require = pytest; pytest-cov; pandas; tox
tests_require = pytest; pytest-cov; pytest-datadir; pandas; tox; packaging; mypy
where = src
......@@ -86,6 +86,11 @@ exclude =
max-line-length = 100
check-untyped-defs = False
disallow-untyped-calls = False
ignore-missing-imports = True
# PyScaffold's parameters when the project was created.
# This will be used when updating. Do not change!
......@@ -16,6 +16,12 @@
And finally the :class:`Pvs` class simply provides a :obj:`dict` of some
EPICS Process Variables I often used or could not remember.
Possible environment variables are
from __future__ import with_statement
......@@ -23,18 +29,16 @@ import datetime
import json
import logging
import os
import pathlib
from collections import namedtuple
from os import path
from sys import version_info
from typing import Dict # flake8: noqa
from typing import List # flake8: noqa
from typing import NamedTuple # flake8: noqa
from typing import Optional # flake8: noqa
from typing import Tuple # flake8: noqa
from typing import Union # flake8: noqa
from cassandra import __version__
__author__ = "Julian Gethmann"
__copyright__ = "Julian Gethmann"
__license__ = "mit"
......@@ -45,6 +49,14 @@ try: # Py3
except ImportError: # Py2
from urllib2 import URLError
JSON_Data = List[Optional[Dict[str, Union[int, Dict[str, str]]]]]
# DataSet = NamedTuple("dataset", [List[datetime.datetime], List[Union[List[float], float]]])
DataSet = namedtuple("dataset", ["timestamps", "values"])
# class DataSet(NamedTuple):
# datestamps: datetime.datetime
# values: float
class Pvs(object):
"""Provide shortcuts for some PV names.
......@@ -116,6 +128,10 @@ class Pvs(object):
"clic_I22m_temp": "A:ID-S1:Wig:01:JB:Temp:07",
"clic_cooler4": "A:ID-S1:Wig:01:JB:Temp:19",
"clic_cooler3": "A:ID-S1:Wig:01:JB:Temp:18",
"clic_cooler2": "A:ID-S1:Wig:01:JB:Temp:15",
"clic_cooler1": "A:ID-S1:Wig:01:JB:Temp:13",
"clic_cooler2_20K": "A:ID-S1:Wig:01:JB:Temp:14",
"clic_cooler1_20K": "A:ID-S1:Wig:01:JB:Temp:12",
"clic_he_bar": "A:ID-S1:Wig:01:JB:Pressure:Abs",
"clic_iso_vac_mbar": "A:ID-S1:Wig:01:JB:Pressure:Vac",
"delta_ps1": "A:ID-S1:PS:MCV-01:Current",
......@@ -259,11 +275,11 @@ class CassandraHelper(object):
return ret
JSONData = List[Optional[Dict[str, Union[int, Dict[str, str]]]]]
def cassandra_json_to_timeseries(json_data):
# type: (JSONData) -> NamedTuple("dataset", [List[datetime.datetime], List[Union[List[float], float]]])
def cassandra_json_to_timeseries(
json_data, # type: JSON_Data
# type: (...) -> DataSet
"""Return lists of timestamps and values for given Cassandra's JSON data.
......@@ -278,15 +294,15 @@ class CassandraHelper(object):
>>> CassandraHelper().cassandra_json_to_timeseries([])
dataset(timestamps=[], values=[])
DataSet(timestamps=[], values=[])
>>> CassandraHelper().cassandra_json_to_timeseries([{"value": [1.3e-07],
... "time": 1454698800000000000,
... "severity": {"level": "OK"}}])
dataset(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[1.3e-07])
DataSet(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[1.3e-07])
>>> CassandraHelper().cassandra_json_to_timeseries([{"value": [1.3e-07, 1.4e-07],
... "time": 1454698800000000000,
... "severity": {"level": "OK"}}])
dataset(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[[1.3e-07, 1.4e-07]])
DataSet(timestamps=[datetime.datetime(2016, 2, 5, 20, 0)], values=[[1.3e-07, 1.4e-07]])
.. note:: This function doesn't check for sanity of the data, yet.
.. warning:: Returned values may be empty ones!
......@@ -305,8 +321,28 @@ class CassandraHelper(object):
if entry["severity"]["level"] == "OK"
dataset = namedtuple("dataset", ["timestamps", "values"])
return dataset(timestamps, values)
return DataSet(timestamps, values)
def json_file_to_timeseries(filename):
# type: (Union[str, pathlib.Path]) -> DataSet
"""Return lists of timestamps and values for given Cassandra's JSON file.
filename (str|patlib.Path): file name of a JSON list with Cassandra entries.
The namedtuple `dataset` consists of a list of datetime objects called `timestamps` and
a list of the corresponding values either as floats or
wrapped inside a list if there are more than one value
to be returned. The latter list is called `values`.
.. seealso:: `cassandra_json_to_timeseries`
with open(filename, "r") as fobj:
json_data = json.load(fobj)
return CassandraHelper().cassandra_json_to_timeseries(json_data)
class Cassandra(object):
......@@ -330,6 +366,14 @@ class Cassandra(object):
If the environment variables `CASSANDRA_HOST` is set that will be used as
.. versionchanged:: 0.8.0
If the environment variable `CASSANDRA_JSON_DIR` and the directory does not exist, the data
will be downloaded to this directory. If it exists the JSON files from within this directory
will be used instead of downloading them from the database.
If not all data is available in this directory, it will be downloaded silently.
The json file format changed.
URLError: in case the connection times out (e.g. if one has not
connected to the IBPT-LAN)
......@@ -344,7 +388,7 @@ class Cassandra(object):
dataset(timestamps=[datetime.datetime(2016, 4, 13, 10, 59, 59, 250177), datetime.datetime(2016, 4, 13, 11, 0, 0, 250285), datetime.datetime(2016, 4, 13, 11, 0, 1, 250484)], values=[0.9991628617721655, 0.9992209719948126, 0.9991912895254449])
HOSTNAME = str(os.environ.get("CASSANDRA_HOST", ""))
PORT = int(os.environ.get("CASSANDRA_PORT", 9812))
PORT = int(os.environ.get("CASSANDRA_PORT", "9812"))
logging.debug("Hostname: {}:{}".format(HOSTNAME, PORT))
......@@ -366,34 +410,49 @@ class Cassandra(object):
count (int): somehow correlated to the number of data points fetched,
if not provided the raw data will be provided. Be careful, since
it very much data to load.
directory (str): directory to which the JSON data is written/from where it is read. Defaults to "."
directory (str): directory to which the JSON data is written/from where it is read.
Defaults to ".". If the environment variable `CASSANDRA_JSON_DIR` is set, that will
be used instead.
self.start_time = CassandraHelper().cassandra_time2dt(start)
self.end_time = CassandraHelper().cassandra_time2dt(end)
self.pv = pv
self.count = count = directory = os.environ.get("CASSANDRA_JSON_DIR", directory)
if "CASSANDRA_JSON_DIR" in os.environ.keys():
self._backup = True
pathlib.Path(, parents=True)
self._backup = False
self.json_filename = "{pv}_{start}_{end}_{count}.json".format(
self.json_file = path.join(directory, self.json_filename)
self.json_file = path.join(, self.json_filename)
self.timeout = None # type: Optional[int]
def __enter__(self):
# type: (Cassandra) -> str
""" ..versionadded:: 0.3 """
# type: (Cassandra) -> Tuple[List[datetime.datetime], List[Union[List[float], float]]]
""" ..versionadded:: 0.3
.. versionchanged:: 0.8
If the environment variable `CASSANDRA_JSON_DIR` is set, that directory will be used.
if not self._backup:
import tempfile = tempfile.mkdtemp()
self.json_file = path.join(, self.json_filename)
return self.get_json_local()
def __exit__(self, ctx_type, ctx_value, ctx_traceback):
# type: (Cassandra, Optional[Exception], Optional[str], Optional[TracebackType]) -> None
""" ..versionadded:: 0.3 """
if not self._backup:
from shutil import rmtree
......@@ -40,7 +40,7 @@ def _get_mean_values(
# pass
output_str = {
"full": "{name} = {val} \pm {std}",
"full": "{name} = {val} \\pm {std}",
"raw": "{val}",
"str": "{val}",
# Add requirements only needed for your unittests and during development here.
# They will be installed automatically when running `python test`.
# ATTENTION: Don't remove pytest-cov and pytest as they are needed.
......@@ -8,11 +8,15 @@
import datetime
import json
import os
import pathlib
import shutil
from contextlib import suppress
from importlib import reload
import cassandra
import pytest
from cassandra.cassandra import Cassandra
from cassandra.cassandra import Cassandra, CassandraHelper
from packaging import version
from .conftest import request_openurl
......@@ -26,6 +30,30 @@ except ImportError: # Py2
import urllib2 as request
@pytest.mark.usefixtures("cleandir", "setup_")
class TestCassandraHelper(object):
def json_file(self, cleandir):
to = pathlib.Path(cleandir) / "PEAKTUNE2.json"
shutil.copyfile((pathlib.Path(__file__).parent /
"data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json"), to)
return to.absolute()
def test_json_file_to_timeseries(self, json_file):
res = CassandraHelper().json_file_to_timeseries(json_file)
assert isinstance(res, tuple)
data = json.loads(
(pathlib.Path(__file__).parent /
assert res[0] == [datetime.datetime.fromtimestamp(d["time"] / 1e9) for d in data]
assert res[1] == [d["value"][0] for d in data]
assert res.timestamps == [datetime.datetime.fromtimestamp(d["time"] / 1e9) for d in data]
assert res.values == [d["value"][0] for d in data]
with pytest.raises(FileNotFoundError):
@pytest.mark.usefixtures("cleandir", "setup_")
class TestCassandraClass(object):
......@@ -41,7 +69,12 @@ class TestCassandraClass(object):
assert os.path.abspath( == os.path.abspath(os.path.curdir)
# assert pathlib.Path( == pathlib.Path.cwd()
if version.parse(cassandra.__version__) <= version.parse("0.7.2"):
assert cas2.json_filename == "A:SR:BeamInfo:01:Energy_10:10:10_10:10:30_None.json"
assert cas2.json_filename == \
"A:SR:BeamInfo:01:Energy_2017-11-02T10:10:10_2017-11-02T10:10:30" \
def test_gen_url(self, cas, setup_):
url = cas.gen_url()
......@@ -101,6 +134,39 @@ class TestCassandraClass(object):
assert cas.HOSTNAME == "test"
assert cas.PORT == 1234
def env_backup(self, monkeypatch):
with monkeypatch.context() as m:
m.setenv("CASSANDRA_JSON_DIR", "backup_test")
with suppress(FileNotFoundError):
def test_backup(self, setup_, env_backup, monkeypatch):
start, end, pv = setup_
def off(url, timeout):
raise URLError(
"Request had a timeout. Maybe you're not inside the IBPT-CN-LAN or provided a wrong PV name"
assert not pathlib.Path("backup_test").is_dir()
with cassandra.cassandra.Cassandra(start, end, pv) as cas:
assert pathlib.Path("backup_test").is_dir()
assert pathlib.Path("backup_test/A:SR:BeamInfo:01:Energy_2017-11-02T10:10:10_2017-11"
"-02T10:10:30_None.json").is_file(), "Backup file does not exist (1)"
monkeypatch.setattr(request, "urlopen", off)
with cassandra.cassandra.Cassandra(start, end, pv) as cas2:
assert pathlib.Path("backup_test").is_dir()
assert pathlib.Path("backup_test/A:SR:BeamInfo:01:Energy_2017-11-02T10:10:10_2017-11"
"-02T10:10:30_None.json").is_file(), "Backup file does not exist (2)"
assert cas[0] == cas2[0]
assert cas[1] == cas2[1]
def test_download_cassandra_data_online(self, cas, monkeypatch):
with open(
os.path.dirname(os.path.abspath(__file__)) +
......@@ -122,16 +188,16 @@ class TestCassandraClass(object):
with pytest.raises(URLError, match=r".*[Tt]imeout.*"):
def test_dump_cassandra_data(self, cas, cleandir, monkeypatch):
def test_dump_cassandra_data(self, cas, cleandir, monkeypatch, shared_datadir):
assert not os.path.isfile(os.path.join(cleandir, cas.json_filename))
# assert not pathlib.Path(cleandir / cas.json_filename).is_file()
assert not pathlib.Path(cleandir / cas.json_filename).is_file()
monkeypatch.setattr(request, "urlopen", request_openurl)
assert str(cleandir / cas.json_filename) == cas.dump_cassandra_data()
assert os.path.isfile(os.path.join(cleandir, cas.json_filename))
# assert pathlib.Path(cleandir / cas.json_filename).is_file()
assert pathlib.Path(cleandir / cas.json_filename).is_file()
with open(str(cleandir / cas.json_filename), "r") as got, open(
os.path.dirname(os.path.abspath(__file__)) +
"/data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json", "r") as expected:
shared_datadir / "A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json",
"r") as expected:
assert json.load(expected) == json.load(got)
def test_with(self, setup_dl, monkeypatch):
......@@ -8,7 +8,6 @@ Unit tests for cassandra.pd
import datetime
import doctest
import json
import os
import sys
import unittest
from typing import List, NamedTuple
......@@ -58,7 +57,7 @@ class TestCassandraPD(object):
@pytest.mark.skipif(sys.version_info < (3, 4), reason="requires python3.4")
def test_pvs2pd(self, monkeypatch, setup_pd, cleandir):
def test_pvs2pd(self, monkeypatch, setup_pd, cleandir, shared_datadir):
monkeypatch.setattr(request, "urlopen", self.urlopen_nus)
ret = pvs2pd(*setup_pd)
assert [0.81171229137420042, 0.72489568710029628] == list(ret.mean())
......@@ -69,16 +68,12 @@ class TestCassandraPD(object):
with pytest.raises(ValueError):
pvs2pd(*setup_pd, upsample=True)
fn = "A:SR:BBB:01:X:SRAM:PEAKTUNE2_18:28:36_18:28:51_None.json"
fn_in = "A:SR:BBB:01:X:SRAM:PEAKTUNE2_18:28:36_18:28:51_None.json"
fn = "A:SR:BBB:01:X:SRAM:PEAKTUNE2_2016-08-05T18:28:36_2016-08-05T18:28:51_None.json"
assert not pathlib.Path(cleandir / fn).is_file()
pvs2pd(*setup_pd, save_local=True)
with open(str(cleandir / fn), "r") as got, open(
) as expected:
with open(str(cleandir / fn), "r") as got, \
open((shared_datadir / fn_in).absolute(), "r", ) as expected:
assert json.load(expected) == json.load(got)
......@@ -10,8 +10,10 @@ commands =
deps =
changedir = {toxinidir}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment