Commit e1e04b7b authored by Robin Schnaidt's avatar Robin Schnaidt

Add Teco Helper

parent 46b5734a
from typing import Optional, Dict, List, Tuple
import numpy as np
import pandas as pd
from tecohelper.config import LABELS, RTLS
from tecohelper.recording import Recording
class AnvilHelper(object):
def __init__(self, h5file, recording_key, anvil_export):
self.recording = Recording(h5file, recording_key)
self.anvil_annotations = pd.read_csv(anvil_export, sep=';')
self._tokens = None
self._end_annotation = None
self._start_annotation = None
def _get_token_rows(self, label_field):
"""
Returns a dataframe to where each row contains start and end frame for
an occurrence of the given label inside the recording.
"""
self.anvil_annotations['block'] = (
self.anvil_annotations[label_field].shift(1) !=
self.anvil_annotations[label_field]).astype(int).cumsum()
current_tokens = (
self.anvil_annotations.reset_index().groupby(
['block', label_field])['index'].apply(np.array).reset_index())
current_tokens = current_tokens[current_tokens[label_field] > 0]
# current_tokens = current_tokens[current_tokens[label_field] == 0]
current_tokens.drop(['block', label_field], axis=1, inplace=True)
return current_tokens
@property
def start_annotation(self) -> int:
if self._start_annotation is None:
cf = "init.start_recording"
rws = self._get_token_rows(cf)
self._start_annotation = rws.iloc[0][0][
int(len(rws.iloc[0][0]) / 2)]
return self._start_annotation
@property
def end_annotation(self) -> int:
if self._end_annotation is None:
cf = "init.end_recording"
rws = self._get_token_rows(cf)
self._end_annotation = rws.iloc[0][0][int(len(rws.iloc[0][0]) / 2)]
return self._end_annotation
@property
def frames_in_recording(self) -> int:
return self.end_annotation - self.start_annotation
@property
def tokens(self) -> Dict[str, List[Tuple[Tuple[int, int], int]]]:
"""
A dictionary containing a list of frame borders for all occurrences of
a label inside a recording.
"""
if self._tokens is None:
rws = {}
for lbl in LABELS: # type: str
brdrs = []
cf = self._get_token_rows(lbl)
for _, row in cf.iterrows():
start_frame = row[0][0] # type: int
end_frame = row[0][-1] # type: int
token_subtype = self.anvil_annotations[lbl][
row[0][0]] # type: int
brdrs.append(((start_frame, end_frame), token_subtype))
rws[lbl] = brdrs
self._tokens = rws
return self._tokens
def convert_frame_borders_to_timestamps(self, sensor_name: str,
frame_start: int, frame_end: int):
"""
Converts video frame borders to timestamps for a given sensor, usable
as an index in the recording.
"""
if "start {}".format(sensor_name) in self.recording.annotations.keys():
sensor_start_timestamp = self.recording.get_start_timestamp(sensor_name)
sensor_end_timestamp = self.recording.get_end_timestamp(sensor_name)
framedelta = (sensor_end_timestamp - sensor_start_timestamp) / self.frames_in_recording
return (sensor_start_timestamp + (frame_start - self.start_annotation) * framedelta,
sensor_start_timestamp + (frame_end - self.start_annotation) * framedelta)
else:
return self.convert_frame_borders_to_timestamps(self.recording.left_imu_sensor,
frame_start, frame_end)
def get_token_dataframe_for_sensor(self, sensor_name: str,
frame_start: int,
frame_end: int) -> pd.DataFrame:
"""
Returns a DataFrame with sensor data of a single sensor for a specified
slice of the recording, given by video frame borders.
"""
timestamp_start, timestamp_end = self.convert_frame_borders_to_timestamps(
sensor_name, frame_start, frame_end)
df = self.recording.get_sensor_slice_df(sensor_name, timestamp_start,
timestamp_end)
return df
def get_token_dataframe(self, label: str, token_index: int) -> Optional[
pd.DataFrame]:
"""
Returns a DataFrame with sensor data for a selected token from the
recording.
:param label: The label of the token to extract data for
:type label: str
:param token_index: Repetition index of the token (so n-1 for the nth
occurrence inside the data)
:type token_index: int
:returns: A DataFrame with sensor data during the duration of the token
for all sensors
The sensor data will be reindexed and resampled to an approximate
sample rate of 50Hz.
:rtype: pd.DataFrame
"""
if self.tokens is None:
return None
label_tokens = self.tokens[label]
if token_index >= len(label_tokens):
print(f"Only {len(label_tokens)} tokens available "
f"for label {label}!")
return None
(frame_start, frame_end), token_subtype = label_tokens[token_index]
sensor_dfs = {}
for sensor in self.recording.sensors:
# get data frame for single sensor
sensor_df = self.get_token_dataframe_for_sensor(sensor,
frame_start,
frame_end)
# drop unused columns from frame
cols_to_keep = ['device_address', 'device_timestamp',
'packet_number',
'update_type']
cols_to_drop = []
if sensor == self.recording.left_imu_sensor:
cols_to_keep.append('received_timestamp')
colpref = "left"
elif sensor == self.recording.right_imu_sensor:
colpref = "right"
elif sensor == self.recording.hip_imu_sensor:
colpref = "hip"
else:
if True in [RTLS in x for x in sensor_df.columns]:
colpref = RTLS
else:
colpref = "rtls"
for col in sensor_df.columns:
if not (colpref in col or col in cols_to_keep):
cols_to_drop.append(col)
sensor_df = sensor_df.drop(columns=cols_to_drop, axis=1)
# rename id of rtls to rtls
if True in [RTLS in x for x in sensor_df.columns]:
sensor_df.rename(columns={
'70b3d587900101ad_accuracy': 'rtls_accuracy',
'70b3d587900101ad_accuracy_radius': 'rtls_accuracy_radius',
'70b3d587900101ad_mapped_position': 'rtls_mapped_position',
'70b3d587900101ad_state': 'rtls_state',
'70b3d587900101ad_x_filtered': 'rtls_x_filtered',
'70b3d587900101ad_x_unfiltered': 'rtls_x_unfiltered',
'70b3d587900101ad_y_filtered': 'rtls_y_filtered',
'70b3d587900101ad_y_unfiltered': 'rtls_y_unfiltered',
'70b3d587900101ad_z_filtered': 'rtls_z_filtered',
'70b3d587900101ad_z_unfiltered': 'rtls_z_unfiltered',
}, inplace=True)
sensor_dfs[sensor] = sensor_df
# concatenate dfs and reindex to LEFT
concatenation = [sensor_dfs[self.recording.left_imu_sensor].reset_index(drop=True), ]
concat_names = [self.recording.left_imu_sensor]
for sensor in self.recording.sensors:
if sensor == self.recording.left_imu_sensor:
continue
if "start {}".format(sensor) in self.recording.annotations.keys():
# left and right might differ in index
concatenation.append(sensor_dfs[sensor].reset_index(drop=True))
concat_names.append(sensor)
continue
# drop duplicated index entries (only applicable for RTLS)
sensor_dfs[sensor] = sensor_dfs[sensor].loc[
~sensor_dfs[sensor].index.duplicated(keep='first')]
# reindex to LEFT
sensor_dfs[sensor] = sensor_dfs[sensor].reindex(
sensor_dfs[self.recording.left_imu_sensor].index,
method="nearest")
concatenation.append(sensor_dfs[sensor].reset_index(drop=True))
concat_names.append(sensor)
# actual concatenation on column axis
concatenated = pd.concat(concatenation, axis=1).dropna()
#concatenated.set_index(old_index[:concatenated.shape[0]])
sampling_rate = 50 # Hz
frame_delay = str(int(1000 / sampling_rate)) + 'ms'
concatenated = concatenated.set_index("received_timestamp")
concatenated = concatenated.resample(frame_delay).mean().ffill()
return concatenated
RIGHT = "CE:DB:55:5F:37:4E"
LEFT = "D2:E9:B3:C0:BC:BE"
HIP = "E5:05:6B:A0:49:4B"
RIGHT_OLD_MACS = [f"IMU c8:e7:19:be:c8:80",
f"IMU fb:81:8a:61:af:ef",
f"IMU fb:51:83:d4:9a:8c",
f"IMU fa:24:ac:f5:51:2e"]
LEFT_OLD = "fd:81:6a:2b:83:7d"
HIP_OLD = "d2:e9:b3:c0:bc:be"
RTLS = "70b3d587900101ad"
LEFT_SENSOR = f"IMU {LEFT}"
RIGHT_SENSOR = f"IMU {RIGHT}"
HIP_SENSOR = f"IMU {HIP}"
LEFT_SENSOR_OLD = f"IMU {LEFT_OLD}"
HIP_SENSOR_OLD = f"IMU {HIP_OLD}"
RTLS_SENSOR = f"RTLS {RTLS}"
LABELS = [
'low-level.hand-extend:direction_of_extention',
'low-level.take-piece:piece_direction_on_table',
'low-level.drop-piece:bin_number',
'low-level.hand-retract:hand_specification',
'low-level.switch-hand:from_to_hand_spec',
'movement.stand',
'movement.walk'
]
import tables
class H5FileHelper(object):
def __init__(self, filename):
self.filename = filename
@property
def recordings(self):
h5file = tables.open_file(self.filename, mode="r")
recording_names = list(h5file.root._v_children.keys())
recording_names = [x for x in recording_names if not "label" in x and
not "annotation" in x]
recording_names.sort()
h5file.close()
return recording_names
@property
def annotations(self):
h5file = tables.open_file(self.filename, mode="r")
recording_annotations = list(h5file.root._v_children.keys())
recording_annotations = [x for x in recording_annotations
if "annotation" in x]
recording_annotations.sort()
h5file.close()
return recording_annotations
def is_annotated(self, recording_name):
return True in [recording_name in x for x in self.annotations]
\ No newline at end of file
import numpy as np
import pandas as pd
from tecohelper.config import HIP_SENSOR, LEFT_SENSOR, \
RIGHT_SENSOR, LEFT_SENSOR_OLD, HIP_SENSOR_OLD, RTLS_SENSOR, RIGHT_OLD_MACS
class Recording(object):
def __init__(self, filename, recording_key):
self._recording_key = recording_key
self._filename = filename
self._df = pd.read_hdf(filename, key="/{}".format(recording_key))
self._tokens = None
self._annotations = None
self._start_and_stop_frames = None
self._sensors = None
@property
def tokens(self):
if self._tokens is None:
try:
self._tokens = pd.read_hdf(self._filename,
key="/{}_labels".format(
self._recording_key
))
except KeyError:
self._tokens = None
return self._tokens
@property
def annotations(self):
if self._annotations is None:
try:
self._annotations = pd.read_hdf(self._filename,
key="/{}_annotations".format(
self._recording_key
))
except KeyError:
self._annotations = None
return self._annotations
@property
def right_imu_sensor(self):
if RIGHT_SENSOR in self.sensors:
return RIGHT_SENSOR
else:
for snsr in self.sensors:
if snsr in RIGHT_OLD_MACS:
return snsr
@property
def left_imu_sensor(self):
if RIGHT_SENSOR in self.sensors:
return LEFT_SENSOR
else:
return LEFT_SENSOR_OLD
@property
def hip_imu_sensor(self):
if RIGHT_SENSOR in self.sensors:
return HIP_SENSOR
else:
return HIP_SENSOR_OLD
@property
def right_imu_dataframe(self):
return self._df[self._df["update_type"] == self.right_imu_sensor]
@property
def left_imu_dataframe(self):
return self._df[self._df["update_type"] == self.left_imu_sensor]
@property
def hip_imu_dataframe(self):
return self._df[self._df["update_type"] == self.hip_imu_sensor]
@property
def sensors(self):
grpby = self._df.groupby("update_type")
return [x for x in list(grpby.groups.keys())
if not "RTLS" in x or x == RTLS_SENSOR]#if x in
#[LEFT_SENSOR, LEFT_SENSOR_OLD, RIGHT_SENSOR, RIGHT_SENSOR_OLD,
#HIP_SENSOR, HIP_SENSOR_OLD, RTLS_SENSOR]]
@staticmethod
def find_closest_index(df, column, value):
exactmatch = df[df[column] == value]
if not exactmatch.empty:
return exactmatch.index
else:
lowerneighbour_ind = df[df[column] < value][column]
if lowerneighbour_ind.shape[0] > 0:
lowerneighbour_ind = lowerneighbour_ind.idxmax()
else:
lowerneighbour_ind = df.index[0]
upperneighbour_ind = df[df[column] > value][column]
if upperneighbour_ind.shape[0] > 0:
upperneighbour_ind = upperneighbour_ind.idxmin()
else:
upperneighbour_ind = df.index[-1]
lv = (np.max(df.at[lowerneighbour_ind, column]) if
type(df.at[lowerneighbour_ind, column]) == np.ndarray else
df.at[lowerneighbour_ind, column])
hv = (np.min(df.at[upperneighbour_ind, column]) if
type(df.at[upperneighbour_ind, column]) == np.ndarray else
df.at[upperneighbour_ind, column])
lower_val = value - lv
higher_val = hv - value
if higher_val > lower_val:
return lowerneighbour_ind
else:
return upperneighbour_ind
def get_sensor_slice_df(self, sensor_name, first_timestamp, last_timestamp):
start_sensor = Recording.find_closest_index(
self._df[self._df["update_type"] == sensor_name],
"received_timestamp",
first_timestamp
)
end_sensor = Recording.find_closest_index(
self._df[self._df["update_type"] == sensor_name],
"received_timestamp",
last_timestamp
)
if isinstance(start_sensor, pd.DatetimeIndex):
start_sensor = start_sensor.values.astype(np.int64)
start_sensor = pd.Timestamp(start_sensor[0], unit='ns')
if isinstance(end_sensor, pd.DatetimeIndex):
end_sensor = end_sensor.values.astype(np.int64)
end_sensor = pd.Timestamp(end_sensor[0], unit='ns')
return self._df[self._df["update_type"] == sensor_name].loc[
start_sensor:end_sensor]
def _get_sensor_token_df(self, sensor_name, token):
start_timestamp = self.annotations[
self.annotations["label"] == "init.start_recording"].iloc[0][
"start {}".format(sensor_name)]
end_timestamp = self.annotations[
self.annotations["label"] == "init.end_recording"].iloc[0][
"end {}".format(sensor_name)]
return self._df[self._df["update_type"] == sensor_name].loc[
start_timestamp:end_timestamp]
def _get_sensor_token_df_no_stamps(self, sensor_name, token):
start_left = self.annotations[
self.annotations["label"] == "init.start_recording"].iloc[0][
"start {}".format(self.left_imu_sensor)]
end_left = self.annotations[
self.annotations["label"] == "init.end_recording"].iloc[0][
"end {}".format(self.left_imu_sensor)]
start_sensor = Recording.find_closest_index(
self._df[self._df["update_type"] == sensor_name],
"received_timestamp",
start_left
)
end_sensor = Recording.find_closest_index(
self._df[self._df["update_type"] == sensor_name],
"received_timestamp",
end_left
)
return self._df[self._df["update_type"] == sensor_name].loc[
start_sensor:end_sensor]
def get_start_timestamp(self, sensor_name):
if "start {}".format(sensor_name) in self.annotations.keys():
return self.annotations[
self.annotations["label"] == "init.start_recording"].iloc[0][
"start {}".format(sensor_name)]
else:
start_left = self.annotations[
self.annotations["label"] == "init.start_recording"].iloc[0][
"start {}".format(self.left_imu_sensor)]
return Recording.find_closest_index(
self._df[self._df["update_type"] == sensor_name],
"received_timestamp",
start_left
)
def get_end_timestamp(self, sensor_name):
if "start {}".format(sensor_name) in self.annotations.keys():
return self.annotations[
self.annotations["label"] == "init.end_recording"].iloc[0][
"start {}".format(sensor_name)]
else:
start_left = self.annotations[
self.annotations["label"] == "init.end_recording"].iloc[0][
"start {}".format(self.left_imu_sensor)]
return Recording.find_closest_index(
self._df[self._df["update_type"] == sensor_name],
"received_timestamp",
start_left
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment