cassandra.py 18.7 KB
Newer Older
julian.gethmann's avatar
Fixes  
julian.gethmann committed
1
# -*- coding: utf-8 -*-
2 3
"""
    :Authors: Julian Gethmann
4
    :Contact: atb@gethmann.org
5
    :Date: 2016-03-02
6

7 8 9 10 11 12 13 14
    Load data from a `pv` from `start_time` to `end_time` and plot it.
    If run without any argument this script tries to connect to las-bernhard.anka.kit.edu as a proxy.
    Afterwards it plots the data and stores it on the local machine.
    If run with "anka" as its argument runs without the proxy-thing (e. g. via VPN)
"""
from __future__ import with_statement

import datetime
julian.gethmann's avatar
julian.gethmann committed
15
import json
16
from collections import namedtuple
17 18
from os import path
from sys import version_info
julian.gethmann's avatar
julian.gethmann committed
19
from typing import List, Optional, Union
20

julian.gethmann's avatar
julian.gethmann committed
21 22 23 24 25
from cassandra import __version__

__author__ = "Julian Gethmann"
__copyright__ = "Julian Gethmann"
__license__ = "mit"
26 27
__all__ = ["Cassandra", "CassandraHelper", "Pvs"]

28
try:  # Py3
29
    from urllib.error import URLError
30
except ImportError:  # Py2
31 32
    from urllib2 import URLError

33 34 35 36 37 38 39

class Pvs(object):
    """Provide shortcuts for some PV names.

    Attributes:
        pv (dict): PV shortcuts
    """
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
    pv = {
        "fill": "A:SR:OperationStatus:01:FillNumber",
        "current": "A:SR:BeamInfo:01:Current",
        "lifetime": "A:SR:BeamInfo:01:Lifetime",
        "energy": "A:SR:BeamInfo:01:Energy",
        "bpms_pos": "A:SR:Orbit:01:BPM:Positions",
        "bpms_names": "A:SR:Orbit:01:BPM:Names",
        "nu_y": "A:SR:BBB:01:Y:SRAM:PEAKTUNE2",
        "nu_x": "A:SR:BBB:01:X:SRAM:PEAKTUNE2",
        "catact_field": "A:ID-S3:Wig:01:Position:Readback",
        "sul_field": "A:ID-S2:Wig:01:Position:Readback",
        "rf_old": "ACS:RFSG_D.01:frequency",
        "rf": "A:TI:SignGen:SR-01:Frequency",
        "q1_old": "ACS:PQ1_S.01:current",
        "q2_old": "ACS:PQ2_S.01:current",
        "q3_old": "ACS:PQ3_S.01:current",
        "q4_old": "ACS:PQ4_S.01:current",
        "q5_old": "ACS:PQ5_S.01:current",
        "sv_old": "ACS:PSV_S.01:current",
        "sh_old": "ACS:PSH_S.01:current",
        "bend": "A:SR:PS:MB-01:Current:Readback",
        "q1": "A:SR:PS:MQ1-01:Current:Readback",
        "q2": "A:SR:PS:MQ2-01:Current:Readback",
        "q3": "A:SR:PS:MQ3-01:Current:Readback",
        "q4": "A:SR:PS:MQ4-01:Current:Readback",
        "q5": "A:SR:PS:MQ5-01:Current:Readback",
        "sv": "A:SR:PS:MSV-01:Current:Readback",
        "sh": "A:SR:PS:MSH-01:Current:Readback",
        "q1_set": "A:SR:PS:MQ1-01:Current:Setpoint:Get",
        "q2_set": "A:SR:PS:MQ2-01:Current:Setpoint:Get",
        "q3_set": "A:SR:PS:MQ3-01:Current:Setpoint:Get",
        "q4_set": "A:SR:PS:MQ4-01:Current:Setpoint:Get",
        "q5_set": "A:SR:PS:MQ5-01:Current:Setpoint:Get",
        "sv_set": "A:SR:PS:MSV-01:Current:Setpoint:Get",
        "sh_set": "A:SR:PS:MSH-01:Current:Setpoint:Get",
        "preVac": "A:SR-S1:VIPPS:08:P2:Rdb",
        "postVac": "A:SR-S1:VIPPS:08:P3:Rdb",
        "clic_field": "A:ID-S1:Wig:01:Position:Readback",
        "clic_field_old": "A:ID-S2:WigBINP:Position:Readback",
        "clic_liner_in": "A:ID-S1:Wig:01:JB:Temp:27",
        "clic_liner_center": "A:ID-S1:Wig:01:JB:Temp:28",
        "clic_liner_out": "A:ID-S1:Wig:01:JB:Temp:29",
        "clic_LHe": "A:ID-S1:Wig:01:LiquidHe:Level",
        "clic_mag_top_temp": "A:ID-S1:Wig:01:JB:Temp:01",
        "clic_mag_bottom_temp": "A:ID-S1:Wig:01:JB:Temp:02",
        "clic_I1": "A:ID-S1:Wig:01:PS1:Current:Readback",
        "clic_I2": "A:ID-S1:Wig:01:PS2:Current:Readback",
        "clic_I3": "A:ID-S1:Wig:01:PS3:Current:Readback",
        "clic_I4": "A:ID-S1:Wig:01:PS4:Current:Readback",
        "clic_he_bar": "A:ID-S1:Wig:01:JB:Pressure:Abs",
        "clic_iso_vac_mbar": "A:ID-S1:Wig:01:JB:Pressure:Vac",
        "delta_ps1": "A:ID-S1:PS:MCV-01:Current",
        "delta_ps2": "A:ID-S1:PS:MCV-02:Current",
        "rf_sync": "A:SR:RF:01:SynchrotronFreq:Get",
        "rf_volt": "A:SR:RF:01:Voltage",
        "orbit_live_x": "A:SR:Orbit:01:Live:X",
        "orbit_live_y": "A:SR:Orbit:01:Live:Y",
        "orbit_ref_x": "A:SR:Orbit:01:Reference:X",
        "orbit_ref_y": "A:SR:Orbit:01:Reference:Y",
    }
100 101 102 103 104 105 106 107 108 109 110 111 112 113


class CassandraHelper(object):
    """Helper functions that you can use for dealing with `class::Cassandra`'s output.

    Functions that do not operate on `class::Cassandra`'s instances, but are statless.

    Attributes:
        PY2 (bool): True if run with Python 2
        PY3 (bool): True if run with Python 3
    """
    PY2 = (version_info.major == 2)
    PY3 = (version_info.major == 3)

114 115
    @staticmethod
    def cassandra_time(time_string):
julian.gethmann's avatar
julian.gethmann committed
116
        # type: (Union[str, datetime.datetime]) -> str
117 118 119
        """Transcode the readable Cassandra time to ns since EPOCH.

        Args:
julian.gethmann's avatar
julian.gethmann committed
120
            time_string (str|datetime): :func:`cassandra_time2dt`
121 122

        Returns:
julian.gethmann's avatar
julian.gethmann committed
123 124 125 126
            str: nano seconds since EPOCH which is the format Cassandra want to have in its JSON queries

        Raises:
            ValueError: if your input does not fit the format
127 128 129 130

        Examples:
            >>> CassandraHelper().cassandra_time("2016/03/02 14:33:00")
            '1456925580000000000'
131 132
            >>> CassandraHelper.cassandra_time("2016/03/02 14:33:00")
            '1456925580000000000'
julian.gethmann's avatar
julian.gethmann committed
133
            >>> CassandraHelper().cassandra_time(datetime.datetime(2016, 3, 2, 14, 33))
134
            '1456925580000000000'
135 136 137 138
            >>> CassandraHelper.cassandra_time("2016-03-02T14h33m00")
            '1456925580000000000'
            >>> CassandraHelper.cassandra_time("2016-03-02T14:33:00")
            '1456925580000000000'
139 140 141 142
            >>> CassandraHelper.cassandra_time("2016-03-02T14h33m00")
            '1456925580000000000'
            >>> CassandraHelper.cassandra_time("2016-03-02T14:33:00")
            '1456925580000000000'
143 144 145

        .. versionadded:: 0.2
            The possibility to provide :py:mod:`datetime` objects as argument.
146 147 148 149

        .. versionchanged:: 0.3
            The possibility to input the new filename convention time string or
            ISO 8601 formatted dates
julian.gethmann's avatar
julian.gethmann committed
150 151 152 153 154 155 156 157 158

        .. seealso:: :func:`cassandra_time2dt`
        """
        if not isinstance(time_string, datetime.datetime):
            time_string = CassandraHelper.cassandra_time2dt(time_string)
        return time_string.strftime("%s") + "000000000"

    @staticmethod
    def cassandra_time2dt(time_string):
julian.gethmann's avatar
julian.gethmann committed
159
        # type: (str) -> datetime.datetime
julian.gethmann's avatar
julian.gethmann committed
160 161 162 163 164
        """Return a :obj:`datetime.datetime` for a given date in cassandra's format

        Args:
            time_string (str):
                * "YYYY/MM/DD HH:MM:SS" (which is written to CSS-Exports) or
julian.gethmann's avatar
julian.gethmann committed
165
                * "YYYY-MM-DDTHHhMMmSS" (which the convention of the IBPT-THz-group
julian.gethmann's avatar
julian.gethmann committed
166
                * "YYYY-MM-DD HH-MM-SS" (which is the MML-names format) or
julian.gethmann's avatar
julian.gethmann committed
167 168 169 170 171 172 173 174 175
                * "YYYY-MM-DDTHH:MM:SS" (which is ISO 8601) or
                * Python :py:mod:`datetime` object.

        Returns:
            datetime.datetime: time_string converted to :obj:`datetime.datetime`

        Raises:
            ValueError: if `time_string` does not match the allowed formats

176 177 178 179
        .. warning::
            In the case of a 60's second there will be an missleading ValueError
                message, since Python cannot handle minutes with 60 seconds!

julian.gethmann's avatar
julian.gethmann committed
180 181 182 183 184 185 186 187 188 189 190 191
        Examples:
            >>> CassandraHelper().cassandra_time2dt("2016/03/02 14:33:00")
            datetime.datetime(2016, 3, 2, 14, 33)
            >>> CassandraHelper.cassandra_time2dt("2016/03/02 14:33:00")
            datetime.datetime(2016, 3, 2, 14, 33)
            >>> CassandraHelper.cassandra_time2dt("2016-03-02T14h33m00")
            datetime.datetime(2016, 3, 2, 14, 33)
            >>> CassandraHelper().cassandra_time2dt("2016-03-02T14:33:00")
            datetime.datetime(2016, 3, 2, 14, 33)
            >>> CassandraHelper().cassandra_time2dt("12")
            Traceback (most recent call last):
                ...
192
            ValueError: `time_string` must not be of the form `12` and type str! See docstring for possible forms.
julian.gethmann's avatar
julian.gethmann committed
193 194 195 196 197
            >>> CassandraHelper().cassandra_time2dt(12)
            Traceback (most recent call last):
                ...
            TypeError: Time (12) must either be a `datetime.datetime` or a string as described in the docstring and not a int.

198 199 200 201 202
            >>> CassandraHelper.cassandra_time2dt("2017/03/17 13:22:60")
            Traceback (most recent call last):
                ...
            ValueError: `time_string` must not be of the form `2017/03/17 13:22:60` and type str! See docstring for possible forms.

julian.gethmann's avatar
julian.gethmann committed
203
        .. versionadded:: 0.5.0
204
        """
205
        if isinstance(time_string, datetime.datetime):
julian.gethmann's avatar
julian.gethmann committed
206
            return time_string
207
        else:
208
            ret = None
209
        for format_ in [
210 211 212 213
                "%Y/%m/%d %H:%M:%S",  # CSS string
                "%Y-%m-%dT%Hh%Mm%S",  # filename convention
                "%Y-%m-%dT%H:%M:%S",  # ISO 8601
                "%Y-%m-%d_%H-%M-%S",  # MML name format
214
        ]:
215 216 217 218 219
            try:
                ret = datetime.datetime.strptime(time_string, format_)
            except ValueError:
                pass
            except TypeError:
220 221 222
                raise TypeError("Time ({}) must either be a `datetime.datetime` or a string"
                                " as described in the docstring and not a {}.".format(
                                    time_string, type(time_string).__name__))
223

julian.gethmann's avatar
julian.gethmann committed
224
        if not ret:
225 226 227
            raise ValueError("`time_string` must not be of the form `{}` and type {}! "
                             "See docstring for possible forms.".format(
                                 time_string, type(time_string).__name__))
julian.gethmann's avatar
julian.gethmann committed
228 229
        else:
            return ret
230

231 232
    @staticmethod
    def cassandra_json_to_timeseries(json_data):
julian.gethmann's avatar
julian.gethmann committed
233
        # type(List[str]) -> Tuple[List[datetime.datetime], List[Union[List[float], float]]]
234 235 236 237 238
        """Return lists of timestamps and values for given Cassandra's JSON data.

        Args:
            json_data (list): JSON list with Cassandra entries.

239 240 241 242 243 244 245
        Returns:
            tuple:
                The tuple consists of a list of datetime objects and
                a list of the corresponding values either as floats or
                wrapped inside a list if there are more than one value
                to be returned.

246 247 248 249 250 251 252
        Examples:
            >>> CassandraHelper().cassandra_json_to_timeseries([])
            ([], [])
            >>> CassandraHelper().cassandra_json_to_timeseries([{"value": [1.3e-07],
            ...    "time": 1454698800000000000,
            ...    "severity": {"level": "OK"}}])
            ([datetime.datetime(2016, 2, 5, 20, 0)], [1.3e-07])
253 254 255 256
            >>> CassandraHelper().cassandra_json_to_timeseries([{"value": [1.3e-07, 1.4e-07],
            ...    "time": 1454698800000000000,
            ...    "severity": {"level": "OK"}}])
            ([datetime.datetime(2016, 2, 5, 20, 0)], [[1.3e-07, 1.4e-07]])
257 258 259

        .. note:: This function doesn't check for sanity of the data, yet.
        .. warning:: Returned values may be empty ones!
260
        .. versionchanged:: 0.3.0
261
        """
262 263 264 265 266 267 268
        values = []
        for entry in json_data:
            if entry["severity"]["level"] == "OK":
                if len(entry["value"]) == 1:
                    values.append(entry["value"][0])
                elif len(entry["value"]) > 1:
                    values.append(entry["value"])
269
        # if entry["status"] == "OK" or entry["severity"]["level"] == "OK"]
270
        timestamps = [
271 272
            datetime.datetime.fromtimestamp(entry["time"] / 1e9) for entry in json_data
            if entry["severity"]["level"] == "OK"
273
        ]
274 275 276

        dataset = namedtuple("dataset", ["timestamps", "values"])
        return dataset(timestamps, values)
277 278 279


class Cassandra(object):
julian.gethmann's avatar
julian.gethmann committed
280
    """Interface to KARA's Cassandra database
281 282

    Attributes:
283 284 285
        start_time (str|datetime): time to start fetching data in Cassandra's
            format "YYYY/MM/DD HH:MM:SS" or as :py:mod:`datetime` object
        end_time (str|datetime): time to end fetching data
286
        pv (str): exact name of one PV
julian.gethmann's avatar
julian.gethmann committed
287
        count (int|None): somehow correlated to the number of data points fetched
288
        directory (str): directory to which the JSON file is written/from where it is read
289 290 291
        json_filename (str): filename to which the JSON data is written/read
        HOSTNAME (str): hostname of the Cassandra server
        PORT (int): port on which the Cassandra server runs
292 293 294

    Can also be used as a context manager that returns the :func:`get_json_local`.

295 296
    Raises:
        URLError: in case the connection times out (e.g. if one has not
297
            connected to the IBPT-LAN)
298

299
    Examples:
300 301 302 303 304 305 306
        >>> from datetime import datetime
        >>> from cassandra import Pvs
        >>> with Cassandra(datetime(2016,4,13,11), datetime(2016,4,13,11,0,1),
        ... Pvs.pv["clic_field"], 1) as cas:
        ...     clic_field = cas
        >>> clic_field
        ([datetime.datetime(2016, 4, 13, 10, 59, 59, 250177), datetime.datetime(2016, 4, 13, 11, 0, 0, 250285), datetime.datetime(2016, 4, 13, 11, 0, 1, 250484)], [0.9991628617721655, 0.9992209719948126, 0.9991912895254449])
307
    """
308
    HOSTNAME = "ankasr-archiver.anka.kit.edu"
309
    PORT = 9812
310
    RETRIES = 10
311

julian.gethmann's avatar
julian.gethmann committed
312 313 314 315 316 317 318 319 320
    def __init__(
            self,
            start,  # type: (Union[str, datetime.datetime])
            end,  # type: (Union[str, datetime.datetime])
            pv,  # type: str
            count=None,  # type: Optional[int]
            directory=".",  # type: Optional[str]
    ):
        # type: (...) -> None
321 322 323
        """Initialize the Cassandra data set.

        Args:
julian.gethmann's avatar
julian.gethmann committed
324 325
            start (str, datetime.datetime): time to start fetching data in Cassandra's format "%Y/%m/%d %H:%M:%S"
            end (str, datetime.datetime): time to end fetching data
326
            pv (str): exact name of one PV
julian.gethmann's avatar
julian.gethmann committed
327 328 329
            count (int): somehow correlated to the number of data points fetched,
                if not provided the raw data will be provided. Be careful, since
                it very much data to load.
330
            directory (str): directory to which the JSON data is written/from where it is read. Defaults to "."
331
        """
julian.gethmann's avatar
julian.gethmann committed
332 333
        self.start_time = CassandraHelper().cassandra_time2dt(start)
        self.end_time = CassandraHelper().cassandra_time2dt(end)
334 335 336
        self.pv = pv
        self.count = count

337
        self.directory = directory
338 339
        self.json_filename = "{pv}_{start}_{end}_{count}.json".format(
            pv=self.pv,
340 341
            start=self.start_time.strftime("%T"),
            end=self.end_time.strftime("%T"),
342
            count=self.count,
343
        )
344
        self.json_file = path.join(directory, self.json_filename)
345

346 347 348 349 350 351 352 353 354 355 356 357
    def __enter__(self):
        """ ..versionadded:: 0.3 """
        import tempfile
        self.directory = tempfile.mkdtemp()
        self.json_file = path.join(self.directory, self.json_filename)
        return self.get_json_local()

    def __exit__(self, ctx_type, ctx_value, ctx_traceback):
        """ ..versionadded:: 0.3 """
        from shutil import rmtree
        rmtree(self.directory)

358 359
    @staticmethod
    def check_connection():
360
        # type: () -> bool
361 362 363 364 365 366
        """Return if the connection is ok or not

        Returns:
            bool: True if the connection is ok, False if an error occurred
        """
        try:
367
            tmp = Cassandra("2017-07-07T08:08:08", "2017-07-07T08:08:08", Pvs.pv["energy"], count=1)
368 369
            tmp.timeout = 1
            tmp._download_cassandra_data()
370
        except (TimeoutError, URLError):
371
            ok = False
372 373
        else:
            ok = True
374 375
        return ok

376
    def gen_url(self):
377
        # type: (...) -> str
julian.gethmann's avatar
julian.gethmann committed
378 379 380 381 382 383
        """Return the URL to be fetched from Cassandra.

        If count is `None` then the count parameter is not included in the URL
        which should cause the request of the raw data.

        .. versionchanged:: 0.5
384 385 386

        .. versionchanged:: 0.6
            URL schema changed, because of a new Cassandra deployment
julian.gethmann's avatar
julian.gethmann committed
387
        """
388 389 390
        start_timestamp = CassandraHelper().cassandra_time(self.start_time)
        end_timestamp = CassandraHelper().cassandra_time(self.end_time)

julian.gethmann's avatar
julian.gethmann committed
391
        if self.count:
392
            url = "http://{host}:{port}/archive-access/api/1.0/archive/1/samples/{pv}?start={starttime}&end={endtime}&count={count}"
julian.gethmann's avatar
julian.gethmann committed
393
        else:
394
            url = "http://{host}:{port}/archive-access/api/1.0/archive/1/samples/{pv}?start={starttime}&end={endtime}"
395

396 397 398 399 400 401 402 403
        return url.format(**{
            "port": self.PORT,
            "host": self.HOSTNAME,
            "starttime": start_timestamp,
            "endtime": end_timestamp,
            "pv": self.pv,
            "count": self.count,
        })
404 405

    def _download_cassandra_data(self):
julian.gethmann's avatar
julian.gethmann committed
406
        # type: (...) -> List[str]
407 408
        """Return Cassandra's JSON as an JSON object. """
        url = self.gen_url()
409 410 411 412
        try:
            _timeout = self.timeout
        except AttributeError:
            _timeout = 5
413 414

        if CassandraHelper.PY3:
415
            from urllib import request
julian.gethmann's avatar
julian.gethmann committed
416
            for err_count in range(self.RETRIES):
417
                try:
418
                    with request.urlopen(url, timeout=_timeout) as response:
419
                        json_data = json.loads(response.read().decode("utf8"))
julian.gethmann's avatar
julian.gethmann committed
420
                    break
421
                except URLError:
julian.gethmann's avatar
julian.gethmann committed
422 423
                    pass
            else:
424
                raise URLError("Request had a timeout. Maybe you're not inside the ANKA-LAN")
425 426
        elif CassandraHelper.PY2:
            import urllib2 as request
julian.gethmann's avatar
julian.gethmann committed
427
            for err_count in range(self.RETRIES):
428
                try:
429
                    fobj = request.urlopen(url, timeout=_timeout)
julian.gethmann's avatar
julian.gethmann committed
430 431 432
                    json_data = fobj.read()
                    fobj.close()
                    break
433
                except URLError:
julian.gethmann's avatar
julian.gethmann committed
434 435
                    pass
            else:
436
                raise URLError(
julian.gethmann's avatar
julian.gethmann committed
437 438
                    "Request had a timeout. Maybe you're not inside the IBPT-CN-LAN or provided a wrong PV name"
                )
439
        else:
440
            raise NotImplementedError("Not implemented for other versions than 2 or 3!")
441

julian.gethmann's avatar
julian.gethmann committed
442
        return json_data
443

444
    def dump_cassandra_data(self):
445
        # type: (...) -> str
446
        """Dump the JSON file to a file named like the PV and time that is returned
447 448 449 450 451

        Dump a JSON file fetched from the Cassandra `host` and return its name.

        Returns:
            absolute path and filename of the JSON file.
452 453

        .. versionchanged:: 0.7.2
454
            Remove argument `show_url` since it was not implemented either.
455 456
        """
        json_data = self._download_cassandra_data()
457
        with open(self.json_file, "w") as fobj:
458
            json.dump(json_data, fobj)
459
        return path.abspath(self.json_file)
460 461

    def get_json_local(self):
462
        # type: (...) -> Tuple[List[datetime.datetime], List[Union[List[float], float]]]
463
        """ Return timestamps and values for given Cassandra object and copy JSON file if neccessary."""
464
        if not path.isfile(self.json_file) or path.getsize(self.json_file) <= 2:
465
            self.dump_cassandra_data()
466
        with open(self.json_file, "r") as fobj:
467
            json_data = json.load(fobj)
468
        return CassandraHelper().cassandra_json_to_timeseries(json_data)
469 470 471 472 473


if __name__ == "__main__":
    import doctest
    doctest.testmod()