Commit 93b30dae authored by julian.gethmann's avatar julian.gethmann

Update timestamps in docstring

* Update timestamps in docstrings
* Add mypy to setup.cfg
* Fix formatting issues
parent d13ef5c7
......@@ -31,3 +31,8 @@ repos:
sha: v0.5.0
hooks:
- id: mypy
exclude: >
(?x)^(
tests/.*|
docs/.*
)$
......@@ -86,6 +86,15 @@ exclude =
docs/conf.py
max-line-length = 100
[mypy]
python_version = 3.4
warn_return_any = True
warn_unused_configs = True
ignore_missing_imports = True
[mypy.tests]
ignore_errors = True
[pyscaffold]
# PyScaffold's parameters when the project was created.
# This will be used when updating. Do not change!
......
......@@ -19,8 +19,7 @@ from sys import version_info
from typing import List, Optional, Union
import pytz
from cassandra import __version__
from cassandra import __version__, tz
__author__ = "Julian Gethmann"
__copyright__ = "Julian Gethmann"
......@@ -102,6 +101,7 @@ class Pvs(object):
class CEST(datetime.tzinfo):
def utcoffset(self, dt):
return datetime.timedelta(hours=1)
......@@ -127,8 +127,8 @@ class CassandraHelper(object):
@staticmethod
def to_utc(
dt: datetime.datetime,
local_zone: pytz.timezone = pytz.timezone('Europe/Berlin'),
) -> datetime.datetime:
local_zone: pytz.tzinfo.DstTzInfo = pytz.timezone('Europe/Berlin'),
) -> datetime.datetime:
# make naive datetimes aware of their (our local 'Europe/Berlin') timezone
utc = pytz.timezone('UTC')
if not dt.tzinfo:
......@@ -140,18 +140,18 @@ class CassandraHelper(object):
""" Aware UTC datetime as input! """
# Since KARA is located in Germany and the data is saved in a local time
# taken as UTC and then the EPOCH of it.
if dt.tzinfo is None or (dt.tzinfo != pytz.timezone('UTC') and
dt.tzinfo != datetime.timezone.utc):
if dt.tzinfo is None or (dt.tzinfo != pytz.timezone('UTC')
and dt.tzinfo != datetime.timezone.utc):
raise ValueError("`dt` must be a timezone aware UTC datetime object!")
berlin = pytz.timezone('Europe/Berlin')
dt = berlin.normalize(dt.astimezone(berlin))
naive = dt.replace(tzinfo=None)
try: # Py3.3+
#return str(int(naive.timestamp())) + '0' * 9
return str(int((naive - datetime.datetime(1970,1,1)).total_seconds())) + "0" * 9
return str(int((naive - datetime.datetime(1970, 1, 1)).total_seconds())) + "0" * 9
except AttributeError:
import time
return str(int(time.mktime(naive))) + '0' * 9
return str(int(time.mktime(naive.utctimetuple()))) + '0' * 9
@staticmethod
def cassandra_time(time_string):
......@@ -171,19 +171,19 @@ class CassandraHelper(object):
Examples:
>>> CassandraHelper().cassandra_time("2016/03/02 14:33:00")
'1456925580000000000'
'1456929180000000000'
>>> CassandraHelper.cassandra_time("2016/03/02 14:33:00")
'1456925580000000000'
'1456929180000000000'
>>> CassandraHelper().cassandra_time(datetime.datetime(2016, 3, 2, 14, 33))
'1456925580000000000'
'1456929180000000000'
>>> CassandraHelper.cassandra_time("2016-03-02T14h33m00")
'1456925580000000000'
'1456929180000000000'
>>> CassandraHelper.cassandra_time("2016-03-02T14:33:00")
'1456925580000000000'
'1456929180000000000'
>>> CassandraHelper.cassandra_time("2016-03-02T14h33m00")
'1456925580000000000'
'1456929180000000000'
>>> CassandraHelper.cassandra_time("2016-03-02T14:33:00")
'1456925580000000000'
'1456929180000000000'
.. versionadded:: 0.2
The possibility to provide :py:mod:`datetime` objects as argument.
......@@ -205,13 +205,12 @@ class CassandraHelper(object):
if not isinstance(time_string, datetime.datetime):
dt = CassandraHelper.cassandra_time2dt(time_string)
else:
if time_string.tzinfo is None:
_dt = tz.naive_as_berlin(time_string)
dt = CassandraHelper.to_utc(_dt)
dt = time_string
if version_info >= (3, 3):
if dt.tzinfo is not None:
utc_aware = CassandraHelper.to_utc(dt, local_zone=dt.tzinfo)
else:
utc_aware = CassandraHelper.to_utc(dt)
integer_timestamp = CassandraHelper.utc_to_Cassandra_EPOCH(utc_aware)
integer_timestamp = CassandraHelper.utc_to_Cassandra_EPOCH(dt)
else:
raise NotImplementedError('You are using a not supported Python version.')
......@@ -242,13 +241,13 @@ class CassandraHelper(object):
Examples:
>>> CassandraHelper().cassandra_time2dt("2016/03/02 14:33:00")
datetime.datetime(2016, 3, 2, 14, 33)
datetime.datetime(2016, 3, 2, 14, 33, tzinfo=utc)
>>> CassandraHelper.cassandra_time2dt("2016/03/02 14:33:00")
datetime.datetime(2016, 3, 2, 14, 33)
datetime.datetime(2016, 3, 2, 14, 33, tzinfo=utc)
>>> CassandraHelper.cassandra_time2dt("2016-03-02T14h33m00")
datetime.datetime(2016, 3, 2, 14, 33)
datetime.datetime(2016, 3, 2, 14, 33, tzinfo=utc)
>>> CassandraHelper().cassandra_time2dt("2016-03-02T14:33:00")
datetime.datetime(2016, 3, 2, 14, 33)
datetime.datetime(2016, 3, 2, 14, 33, tzinfo=utc)
>>> CassandraHelper().cassandra_time2dt("12")
Traceback (most recent call last):
...
......@@ -268,9 +267,9 @@ class CassandraHelper(object):
berlin = pytz.timezone('Europe/Berlin')
if isinstance(time_string, datetime.datetime):
if not time_string.tzinfo:
time_string = berlin.normalize(berlin.localize(time_string))
return time_string
if time_string.tzinfo is None:
time_string = tz.naive_as_berlin(time_string)
return tz.aware_berlin_to_utc(time_string)
else:
ret = None
for format_ in [
......@@ -281,19 +280,18 @@ class CassandraHelper(object):
]:
try:
ret = datetime.datetime.strptime(time_string, format_)
ret = tz.naive_as_berlin()
except ValueError:
pass
except TypeError:
raise TypeError(
"Time ({}) must either be a `datetime.datetime` or a string"
" as described in the docstring and not a {}.".format(
time_string, type(time_string).__name__))
raise TypeError("Time ({}) must either be a `datetime.datetime` or a string"
" as described in the docstring and not a {}.".format(
time_string, type(time_string).__name__))
if not ret:
raise ValueError(
"`time_string` must not be of the form `{}` and type {}! "
"See docstring for possible forms.".format(
time_string, type(time_string).__name__))
raise ValueError("`time_string` must not be of the form `{}` and type {}! "
"See docstring for possible forms.".format(
time_string, type(time_string).__name__))
else:
return ret
......@@ -337,8 +335,8 @@ class CassandraHelper(object):
values.append(entry["value"])
# if entry["status"] == "OK" or entry["severity"]["level"] == "OK"]
timestamps = [
datetime.datetime.fromtimestamp(entry["time"] / 1e9)
for entry in json_data if entry["severity"]["level"] == "OK"
datetime.datetime.fromtimestamp(entry["time"] / 1e9) for entry in json_data
if entry["severity"]["level"] == "OK"
]
dataset = namedtuple("dataset", ["timestamps", "values"])
......@@ -432,11 +430,7 @@ class Cassandra(object):
bool: True if the connection is ok, False if an error occurred
"""
try:
tmp = Cassandra(
"2017-07-07T08:08:08",
"2017-07-07T08:08:08",
Pvs.pv["energy"],
count=1)
tmp = Cassandra("2017-07-07T08:08:08", "2017-07-07T08:08:08", Pvs.pv["energy"], count=1)
tmp.timeout = 1
tmp._download_cassandra_data()
except (TimeoutError, URLError):
......@@ -492,9 +486,7 @@ class Cassandra(object):
except URLError:
pass
else:
raise URLError(
"Request had a timeout. Maybe you're not inside the ANKA-LAN"
)
raise URLError("Request had a timeout. Maybe you're not inside the ANKA-LAN")
elif CassandraHelper.PY2:
import urllib2 as request
for err_count in range(self.RETRIES):
......@@ -510,8 +502,7 @@ class Cassandra(object):
"Request had a timeout. Maybe you're not inside the IBPT-CN-LAN or provided a wrong PV name"
)
else:
raise NotImplementedError(
"Not implemented for other versions than 2 or 3!")
raise NotImplementedError("Not implemented for other versions than 2 or 3!")
return json_data
......
# coding=utf-8
"""Handle all issues concerning time and dates"""
import datetime
import pytz
from pytz import utc
......@@ -13,8 +13,8 @@ def naive_to_berlin(dt: datetime.datetime) -> datetime.datetime:
pass
def naive_to_utc(dt: datetime.datetime) -> datetime.datetime:
pass
def naive_berlin_to_utc(dt: datetime.datetime) -> datetime.datetime:
return aware_berlin_to_utc(naive_as_berlin(dt))
def naive_as_berlin(dt: datetime.datetime) -> datetime.datetime:
......@@ -40,7 +40,7 @@ def naive_as_utc(dt: datetime.datetime) -> datetime.datetime:
return utc.normalize(utc.localize(dt))
def aware_rename_tz(dt: datetime.datetime, tz: str="Europe/Berlin") -> datetime.datetime:
def aware_rename_tz(dt: datetime.datetime, tz: str = "Europe/Berlin") -> datetime.datetime:
"""Rename the timezone, but do not shift it accordingly."""
return dt.replace(tzinfo=pytz.timezone(tz))
......@@ -69,4 +69,8 @@ def aware_to_naive_berlin(dt: datetime.datetime) -> datetime.datetime:
def aware_as_naive(dt: datetime.datetime) -> datetime.datetime:
return dt.replace(tzinfo=None)
\ No newline at end of file
return dt.replace(tzinfo=None)
def nsepoch2utc(time: int) -> datetime.datetime:
return datetime.datetime.utcfromtimestamp(time // 1e9)
......@@ -12,37 +12,38 @@ import pathlib
from urllib import error, request
from urllib.error import URLError
import pytz
import cassandra
import pytest
import pytz
from cassandra.cassandra import CassandraHelper
# from cassandra.cassandra import *
from .conftest import request_openurl
@pytest.mark.usefixtures("setup_", "utc_test_case", "dst_test_case")
class TestCassandraHelperClass(object):
def test_to_utc(self, utc_test_case, dst_test_case):
test_case = datetime.datetime(1970, 1, 1, 3)
# utc_test_case = datetime.datetime(1970,1,1, 2, tzinfo=datetime.timezone.utc)
berlin_test_case = pytz.timezone('Europe/Berlin').localize(datetime.datetime(1970,1,1, 3))
berlin_test_case = pytz.timezone('Europe/Berlin').localize(datetime.datetime(1970, 1, 1, 3))
assert CassandraHelper.to_utc(test_case).tzinfo
assert utc_test_case == CassandraHelper.to_utc(test_case)
assert utc_test_case == CassandraHelper.to_utc(utc_test_case)
assert utc_test_case == CassandraHelper.to_utc(berlin_test_case, pytz.timezone('Europe/Berlin'))
assert utc_test_case == CassandraHelper.to_utc(berlin_test_case,
pytz.timezone('Europe/Berlin'))
assert dst_test_case == CassandraHelper.to_utc(datetime.datetime(2018,7,1, 12))
assert dst_test_case == CassandraHelper.to_utc(datetime.datetime(2018, 7, 1, 12))
def test_utc_to_Cassandra_EPOCH(self, utc_test_case, dst_test_case):
# utc_test_case = datetime.datetime(1970,1,1, 2, tzinfo=datetime.timezone.utc)
utc2_test_case = datetime.datetime(1970,1,1, 2, tzinfo=pytz.timezone('UTC'))
utc2_test_case = datetime.datetime(1970, 1, 1, 2, tzinfo=pytz.timezone('UTC'))
ts = "10800" + "000000000"
naive = datetime.datetime(1970, 1, 1, 3)
berlin_test_case = pytz.timezone('Europe/Berlin').localize(datetime.datetime(1970,1,1, 3))
berlin_test_case = pytz.timezone('Europe/Berlin').localize(datetime.datetime(1970, 1, 1, 3))
assert isinstance(CassandraHelper.utc_to_Cassandra_EPOCH(utc_test_case), str)
assert ts == CassandraHelper.utc_to_Cassandra_EPOCH(utc_test_case)
......@@ -58,18 +59,18 @@ class TestCassandraHelperClass(object):
def test_to_Cassandra_EPOCH(self, utc_test_case):
assert "10800" + "0" * 9 == CassandraHelper.utc_to_Cassandra_EPOCH(CassandraHelper.to_utc(
utc_test_case))
assert "10800" + "0" * 9 == CassandraHelper.utc_to_Cassandra_EPOCH(CassandraHelper.to_utc(
datetime.datetime(1970, 1, 1, 3)))
assert "10800" + "0" * 9 == CassandraHelper.utc_to_Cassandra_EPOCH(
CassandraHelper.to_utc(utc_test_case))
assert "10800" + "0" * 9 == CassandraHelper.utc_to_Cassandra_EPOCH(
CassandraHelper.to_utc(datetime.datetime(1970, 1, 1, 3)))
def test_cassandra_time(self):
# utc(2016/03/02 14:33:00) utc-epoch: 1456929180 -> 1456932780
assert isinstance(CassandraHelper().cassandra_time("2016/03/02 14:33:00"), str)
assert '1456929180000000000' == CassandraHelper().cassandra_time("2016/03/02 14:33:00")
assert '1456929180000000000' == CassandraHelper.cassandra_time("2016/03/02 14:33:00")
assert '1456929180000000000' == CassandraHelper().cassandra_time(datetime.datetime(2016,
3, 2, 14, 33))
assert '1456929180000000000' == CassandraHelper().cassandra_time(
datetime.datetime(2016, 3, 2, 13, 33, tzinfo=utc))
assert '1456929180000000000' == CassandraHelper.cassandra_time("2016-03-02T14h33m00")
assert '1456929180000000000' == CassandraHelper.cassandra_time("2016-03-02T14:33:00")
assert '1456929180000000000' == CassandraHelper.cassandra_time("2016-03-02T14h33m00")
......@@ -77,15 +78,20 @@ class TestCassandraHelperClass(object):
def test_cassandra_time2dt(self):
assert CassandraHelper().cassandra_time2dt("2016/03/02 14:33:00").tzinfo is None
assert datetime.datetime(2016, 3, 2, 14, 33) == CassandraHelper().cassandra_time2dt("2016/03/02 14:33:00")
assert datetime.datetime(2016, 3, 2, 14, 33) == CassandraHelper.cassandra_time2dt("2016/03/02 14:33:00")
assert datetime.datetime(2016, 3, 2, 14, 33) == CassandraHelper.cassandra_time2dt("2016-03-02T14h33m00")
assert datetime.datetime(2016, 3, 2, 14, 33) == CassandraHelper().cassandra_time2dt("2016-03-02T14:33:00")
assert datetime.datetime(2016, 3, 2, 14,
33) == CassandraHelper().cassandra_time2dt("2016/03/02 14:33:00")
assert datetime.datetime(2016, 3, 2, 14,
33) == CassandraHelper.cassandra_time2dt("2016/03/02 14:33:00")
assert datetime.datetime(2016, 3, 2, 14,
33) == CassandraHelper.cassandra_time2dt("2016-03-02T14h33m00")
assert datetime.datetime(2016, 3, 2, 14,
33) == CassandraHelper().cassandra_time2dt("2016-03-02T14:33:00")
with pytest.raises(ValueError, message="*docstring*"):
CassandraHelper().cassandra_time2dt("12")
with pytest.raises(TypeError, message="*string*"):
CassandraHelper().cassandra_time2dt(12)
@pytest.mark.usefixtures("cleandir", "setup_")
class TestCassandraClass(object):
......@@ -105,18 +111,15 @@ class TestCassandraClass(object):
url = cas.gen_url()
start, end, pv = setup_
assert pv == cas.pv
assert (
"http://ankasr-archiver.anka.kit.edu:9812/archive-access/api/1.0/archive/1/samples/"
"A:SR:BeamInfo:01:Energy?start=1509610210000000000&end=1509610230000000000" == url
)
assert ("http://ankasr-archiver.anka.kit.edu:9812/archive-access/api/1.0/archive/1/samples/"
"A:SR:BeamInfo:01:Energy?start=1509610210000000000&end=1509610230000000000" == url)
cas2 = cassandra.cassandra.Cassandra(
start, end, pv, directory="/tmp", count=1000)
cas2 = cassandra.cassandra.Cassandra(start, end, pv, directory="/tmp", count=1000)
url = cas2.gen_url()
assert (
"http://ankasr-archiver.anka.kit.edu:9812/archive-access/api/1.0/archive/1/samples/"
"A:SR:BeamInfo:01:Energy?start=1509610210000000000&end=1509610230000000000&count=1000" == url
)
"http://ankasr-archiver.anka.kit.edu:9812/archive-access/api/1.0/archive/1/samples/"
"A:SR:BeamInfo:01:Energy?start=1509610210000000000&end=1509610230000000000&count=1000"
== url)
def test_check_connection(
self,
......@@ -146,8 +149,7 @@ class TestCassandraClass(object):
def test_download_cassandra_data_online(self, cas, monkeypatch):
with open(
os.path.dirname(os.path.abspath(__file__)) +
"/data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json",
"r") as fobj:
"/data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json", "r") as fobj:
json_obj = json.loads(fobj.read())
monkeypatch.setattr(request, "urlopen", request_openurl)
......@@ -172,8 +174,7 @@ class TestCassandraClass(object):
assert pathlib.Path(cleandir / cas.json_filename).is_file()
with open(str(cleandir / cas.json_filename), "r") as got, open(
os.path.dirname(os.path.abspath(__file__)) +
"/data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json",
"r") as expected:
"/data/A:SR:BBB:01:X:SRAM:PEAKTUNE2_11:45:00_12:13:00_1000.json", "r") as expected:
assert json.load(expected) == json.load(got)
def test_with(self, setup_dl, monkeypatch):
......@@ -183,11 +184,11 @@ class TestCassandraClass(object):
with cassandra.Cassandra(*setup_dl) as cas2:
assert abs(start - cassandra.tz.naive_as_utc(cas2[0][0])) < datetime.timedelta(
seconds=2)
assert abs(end - cassandra.tz.naive_as_utc(cas2[0][-1])) < datetime.timedelta(
seconds=2)
assert abs(end - cassandra.tz.naive_as_utc(cas2[0][-1])) < datetime.timedelta(seconds=2)
assert isinstance(cas2[0][0], datetime.datetime)
assert isinstance(cas2[1][0], float)
if __name__ == "__main__":
pass
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
# coding=utf-8
import pytz
import cassandra
import pytest
import pytz
from cassandra.cassandra import CassandraHelper
from cassandra.tz import *
# from .conftest import request_openurl
@pytest.mark.usefixtures("setup_", "utc_test_case", "dst_test_case")
class TestTz(object):
......@@ -54,7 +54,7 @@ class TestTz(object):
assert aware_summer_berlin == aware_rename_tz(naive_summer, "Europe/Berlin")
def test_aware_utc_to_berlin(self):
aware_summer = datetime.datetime(2017, 7, 11, 10, tzinfo=pytz.utc)
aware_summer = datetime.datetime(2017, 7, 11, 10, tzinfo=pytz.utc)
aware_winter = datetime.datetime(2017, 11, 11, 10, tzinfo=pytz.utc)
assert aware_summer == aware_utc_to_berlin(aware_summer)
assert "CEST" == aware_utc_to_berlin(aware_summer).tzname()
......@@ -98,7 +98,7 @@ class TestTz(object):
aware_to_naive_utc(naive_winter)
def test_aware_to_naive_berlin(self):
pass #assert aware_to_naive_utc()
pass #assert aware_to_naive_utc()
berlin = pytz.timezone('Europe/Berlin')
naive_winter = datetime.datetime(2017, 11, 11, 10)
naive_summer = datetime.datetime(2017, 7, 11, 10)
......@@ -108,6 +108,20 @@ class TestTz(object):
aware_winter_berlin = berlin.normalize(berlin.localize(datetime.datetime(2017, 11, 11, 10)))
assert aware_to_naive_berlin(aware_winter).tzname() is None
def aware_as_naive(self):
pass
\ No newline at end of file
pass
def test_naive_berlin_to_utc(self):
berlin = pytz.timezone('Europe/Berlin')
naive_winter = datetime.datetime(2017, 11, 11, 10)
naive_summer = datetime.datetime(2017, 7, 11, 10)
aware_summer = datetime.datetime(2017, 7, 11, 10, tzinfo=pytz.utc)
aware_winter = datetime.datetime(2017, 11, 11, 10, tzinfo=pytz.utc)
aware_summer_berlin = datetime.datetime(2017, 7, 11, 10, tzinfo=berlin)
assert aware_winter - datetime.timedelta(hours=1) == naive_berlin_to_utc(naive_winter)
assert aware_summer - datetime.timedelta(hours=2) == naive_berlin_to_utc(naive_summer)
assert naive_berlin_to_utc(naive_winter).tzinfo == utc
with pytest.raises(ValueError, message="*already*"):
naive_berlin_to_utc(aware_winter)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment