Commit b6c3d4eb authored by readout's avatar readout
Browse files

-

parent e19dd1db
# p3-alibava-praktikum
# Das EASy-Praktikum
Dieser Versuch soll dazu dienen, die Studenten auf eine zugängliche Art und Weise die Funktionsweise von Siliziumstreifensensoren näher zu bringen. Für die Datennahme wird dazu das **E**ducational **A**libava **Sy**stem von Alibava-Systems verwendet. Die Auswertung der Daten erfolgt mit einem Python-Framework, das den Namen `alibava-analysis` trägt.
## Zur Nutzung von `alibava-analysis`
Das Skript, das ausgeführt werden muss, heißt `main.py`.
Das Skript benötigt 3 Run-Dateien, um zu funktionieren (welche das sind, lernt ihr während der Bearbeitung der Aufgaben!).
Eine Hilfe zur Benutzung erhaltet ihr wie gewohnt mit
```bash
./main.py -h oder python main.py
```
```
usage: main.py [-h] [--custom_range CUSTOM_RANGE CUSTOM_RANGE] [--mask MASK [MASK ...]] [--preview] [--verbose] [--save] [--show] signal_run pedestal_run calibration_run
positional arguments:
signal_run File path to the signal run.
pedestal_run File path to the pedestal run.
calibration_run File path to the calibration run.
optional arguments:
-h, --help show this help message and exit
--custom_range CUSTOM_RANGE CUSTOM_RANGE
Specify a custom range of channels to be analyzed. The range is set by two numbers in succession, e.g. --custom_range 1 100
--mask MASK [MASK ...]
Specify the strips to be masked. Defaults to none if nothing is specified.
--preview Show a preview of the noise map (opens a new window).
--verbose, -v Verbose output
--save, -s Save output to pdf. The plots are saved in /plots.
--show, -S Show plots (opens a new window with a summary of all plots).
```
\ No newline at end of file
# Alibava Analysis
## Usage
```
usage: main.py [-h] [--mask MASK [MASK ...]] [--preview] [--custom_range CUSTOM_RANGE [CUSTOM_RANGE ...]] [--verbose] [--save] [--show] [--database]
gain pedestal signal {0,1}
positional arguments:
gain gain file
pedestal pedestal file
signal signal file
{0,1} which chip has been bonded (0 or 1)?
optional arguments:
-h, --help show this help message and exit
--mask MASK [MASK ...]
Specify the strips to be masked. Defaults to none if nothing is specified
--preview Show a preview of the noise map.
--custom_range CUSTOM_RANGE [CUSTOM_RANGE ...]
Specify a custom range of channels. Defaults to none if nothing is specified
--verbose, -v Verbose output
--save, -s Save output to pdf
--show, -S Show plots.
--database, -d Communicate with database.
```
## Building the docs
Run ```make html``` in the root directory. Resulting docs can be viewed in ```_build/html/index.html```.
\ No newline at end of file
#pylint: disable=no-absolute-import, missing-module-docstring
from .gain import GainScan
from .pedestal import Pedestal
from .raw import Raw
from .clustering import Clustering
from .signal import Signal
from .fitting import Fitting
from .plotting import Plotting
from .decorators import timeit
"""
The `clustering` module
=======================
The `clustering` module is the heart of this analysis framework. It performs all major analyses
including: clustering, calculation of characteristic functions such as the eta function
(see :func:`analysis.clustering.eta_function`).
Clustering groups signals from neighboring channels into one cluster. The highest strip is called
`seed` and is defined by having as signal that lies above `self.seed_cut`. Neighbors have to have
signals higher than `self.neighbor_cut`.
The module than extracts all relevant parameters from the analysis such as cluster sizes,
cluster/seed signals and so on (for a full list of parameters, see
:class:`analysis.plotting.Plotting`)
"""
# pylint: disable=no-absolute-import
from __future__ import division
import logging
import numpy as np
import pandas as pd
from .decorators import timeit
logging.getLogger("matplotlib").setLevel("WARNING")
logging.getLogger("PIL").setLevel("WARNING")
class Clustering():
"""Class representing the clustering procedure of signals induced on the strips per event.
Seeds are defined as signals induced on strips whose magnitudes lie above a user-defined
seed cut (`self.seed_cut`, default: 5).
Neighbors are neighboring those seeds and their signals have to lie above a user-defined
neighbor cut (`self.neighbor_cut`, default: 2).
Clusters of signals defined like above are summarized.
:param ped_run: A handle to the :class:`analysis.pedestal.Pedestal` pedestal object containing
the corresponding pedestal run
:type ped_run: :class:`analysis.pedestal`
:param sig_run: A handle to the :class:`analysis.signal.Signal` signal object containing the
corresponding signal run
:type sig_run: :class:`analysis.signal.Signal`
:param gain_run: A handle to the :class:`analysis.gain.Gain` gain object containing the
corresponding gain run
:type gain_run: :class:`analysis.gain.Gain`
:param seed_cut: Seed cut defining seed strips as signals that lie above the
`seed_cut * ped_run.cm_subtracted_noise` threshold.
:type seed_cut: float
:param neighbor_cut: Neighbor cut defining neighbor strips as strips with signals that lie
above the `neighbor_cut * ped_run.cm_subtracted_noise` threshold.
:type neighbor_cut: float
"""
def __init__(self, ped_run, sig_run, gain_run, **kwargs):
"""Constructor method
"""
self.log = logging.getLogger("analysis.{0}".format(self.__class__.__name__))
self.seed_cut = kwargs.get("seed_cut", 5)
self.neighbor_cut = kwargs.get("neighbor_cut", 1)
self.seed = None
self.neighbor = None
self.eta = np.array([])
self._calc_clusters(ped_run, sig_run, gain_run)
@timeit("s")
def _calc_clusters(self, ped_run, sig_run, gain_run): # pylint: disable=too-many-locals
"""Utility function calculating clustering of induced signals.
:param ped_run: A handle to the :class:`analysis.pedestal.Pedestal` pedestal object
containing the corresponding pedestal run
:type ped_run: :class:`analysis.pedestal`
:param sig_run: A handle to the :class:`analysis.signal.Signal` signal object containing the
corresponding signal run
:type sig_run: :class:`analysis.signal.Signal`
:param gain_run: A handle to the :class:`analysis.gain.Gain` gain object containing the
corresponding gain run
:type gain_run: :class:`analysis.gain.Gain`
"""
self.log.info("Calculating clusters...")
# Define seeds and neighbors as signals above predefined seed + neighbor cuts
self.log.debug(
"Define seeds and neighbors as signals above predefined seed + neighbor cuts")
self.seed = sig_run.signal > self.seed_cut*ped_run.cm_subtracted_noise
self.neighbor = sig_run.signal > self.neighbor_cut*ped_run.cm_subtracted_noise
# Differentiate neighbors to get the bounds
self.log.debug("Differentiate neighbors to get the bounds, step 0")
neighbor_limits = (self.neighbor*1)
self.log.debug("Differentiate neighbors to get the bounds, step 1")
neighbor_limits = neighbor_limits.diff(axis=1)
self.log.debug("Fill first column NaN to 0")
# self.log.debug(neighbor_limits)
neighbor_limits.iloc[:, 0].fillna(0, inplace=True)
self.log.debug("Remove cluster ending, clip negative derivative to 0")
# self.log.debug(neighbor_limits)
neighbor_limits = neighbor_limits.clip(lower=0)
self.log.debug("Calculate cummulative sum")
neighbor_limits = neighbor_limits.cumsum(axis=1)
self.log.debug(
"Multiply with neightbor candidate strips to get cluster candidates")
neighbor_limits = neighbor_limits.multiply(1*self.neighbor)
self.log.debug(
"Multiply with seed strips to get list of valid clusters")
neighbor_seeds = neighbor_limits.multiply(1*self.seed)
# Identify clusters, discard strips/events that don't belong to a cluster
self.log.debug(
"Identify clusters, discard strips/events that don't belong to a cluster")
clusters = list(map(set, neighbor_seeds.values))
for cluster in clusters:
cluster.discard(0)
valid_clusters_df = pd.DataFrame(clusters)
valid_indices = pd.unique(valid_clusters_df.values.ravel())
valid_indices = valid_indices[~np.isnan(valid_indices)]
self.clustering_df = pd.DataFrame()
# Calculate extended mask of automasked channels to discard clusters
# neighboring masked strips
self.log.debug("Calculate extended mask of automasked channels")
edge_strips = np.array([ped_run.mask[0], ped_run.mask[-1]])
if ped_run.masked_channels.size != 0:
plusones = ped_run.masked_channels + 1
minusones = ped_run.masked_channels - 1
extended_mask = np.concatenate((plusones, minusones, edge_strips), axis=None)
extended_mask = np.unique(extended_mask)
else:
extended_mask = np.array(edge_strips)
# Loop through identified clusters
self.log.debug("Loop through identified clusters")
for cl_idx in sorted(valid_indices):
# Select clusters that do not overlap with the extended mask
cluster = sig_run.signal[neighbor_limits == cl_idx][valid_clusters_df.isin(
[cl_idx]).any(axis=1)]
cluster = cluster[~cluster[cluster.columns.intersection(extended_mask)].any(axis=1)]
# Get seed and cluster positions
seed_pos = cluster.idxmax(axis=1)
cl_pos = signal_center(cluster)
# Get noise values for clusters
cluster_noise_adc = ped_run.cm_subtracted_noise.multiply(cluster.notna()) ** 2
cluster_noise = cluster_noise_adc.multiply(gain_run.gain_per_channel ** 2).sum(axis=1)
cluster_noise_adc = cluster_noise_adc.sum(axis=1)
# Get noise values for seeds
seed_noise_adc = ped_run.cm_subtracted_noise[seed_pos].values
seed_noise = np.multiply(
seed_noise_adc, gain_run.gain_per_channel[seed_pos].values)
# Eta function calculation
self.eta = np.append(self.eta, sig_run.signal.loc[seed_pos.index].apply(
eta_function, axis=1).values)
# Get relevant cluster and noise properties
res = {'size': cluster.count(axis=1),
'signal': cluster.multiply(gain_run.gain_per_channel).sum(axis=1),
'signal_adc': cluster.sum(axis=1),
'seed_signal': cluster.multiply(gain_run.gain_per_channel).max(axis=1),
'seed_signal_adc': cluster.max(axis=1),
'noise': np.sqrt(cluster_noise / cluster.count(axis=1)),
'noise_adc': np.sqrt(cluster_noise_adc / cluster.count(axis=1)),
'seed_noise': seed_noise,
'seed_noise_adc': seed_noise_adc,
'seed_pos': seed_pos,
'cluster_pos': cl_pos,
'event': cluster.index,
'time': sig_run.timing.iloc[cluster.index][0],
}
self.clustering_df = self.clustering_df.append(pd.DataFrame(res), ignore_index=True)
# Discard all nan values encountered in self.eta
self.eta = self.eta[~np.isnan(self.eta)]
def __repr__(self):
return repr(self.clustering_df)
def signal_center(signal_per_strip):
"""Utitlity function calculating the nearest strip to a weighted signal center.
:param signal_per_strip: A series containing the signals per strip number.
:type signal_per_strip: :class:`pandas.Series`
:return weighted_strips: The strip nearest to the weighted center of signals.
:rtype: int
"""
strips = signal_per_strip.columns
signals = signal_per_strip.values
median = np.nansum(np.multiply(strips, signals), axis=1)
total_signal = np.nansum(signals, axis=1)
weighted_strips = np.around(np.divide(median, total_signal))
return weighted_strips
def eta_function(event):
"""Utility function calculating the eta function for a given cluster.
The eta function is calculated by considering the seed strip and the strip with the next high
signal.
Then, the eta function is defined as
eta(e) = sig_r/(sig_r + sig_l),
where e denotes an event and sig_r/l denote the right/left strip's signal (out of the two
strips above).
:param event: A series containing signal heights at identified cluster locations
and np.nan everywhere else.
:type event: :class:`pandas.Series`
:return eta: The value of eta(event).
:rtype: float, np.nan
"""
# Maybe this is faster. Spoiler alert: it is.
strips = event.values
# Since we already discarded clusters in proximity of skipped channels, indexing like this
# shouldn't be a problem
try:
seed = strips[np.nanargmax(strips)]
right = strips[np.nanargmax(strips) + 1]
left = strips[np.nanargmax(strips) - 1]
except IndexError:
return np.nan
if left > right:
eta = seed / (seed + left)
else:
eta = right / (seed + right)
return eta
"""
The `decorators` module
=======================
The `decorators` module provides miscellaneous decorators for use in code. At the time of composing
this documentation, the only provided decorator is used for timing code blocks.
"""
from __future__ import absolute_import
import time
import logging
LOG = logging.getLogger("analysis.CodeTimer")
def timeit(mode):
"""This decorator times a function call
:param mode: Mode of the code-timer. Allowed modes are 'ms' and 's', corresponding to the
timing precision.
:type mode: str
:return: Decorator function corresponding to the mode.
:rtype: func
"""
def timer_decorator_ms(func):
def wrapper_ms(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
end = time.time()
LOG.debug("Elapsed time for %s: %s ms",
func.__name__, end - start)
return wrapper_ms
def timer_decorator_s(func):
def wrapper_s(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
end = time.time()
LOG.debug("Elapsed time for %s: %s min:sec", func.__name__,
time.strftime("%M:%S", time.gmtime(end - start)))
return wrapper_s
if mode.lower() == "ms":
return timer_decorator_ms
if mode.lower() == "s":
return timer_decorator_s
return timer_decorator_ms
"""
The `fitting` module
====================
The `fitting` module is responsible for fitting those nice little curves to those distributions
you're seeing.
It is constructed in such a way that implementing new kinds of functions to the distributions can be
done by future maintainers of the code at ease. At the time of composing this documentation, two
functions are available for use: *gauss* and *langau* fits.
Initial fit parameters are guessed by :meth:`analysis.fitting.Fitting._guess_params` and their
corresponding `_params_function(...)` functions.
"""
# pylint: disable=no-absolute-import
from __future__ import division
import logging
import numpy as np
import pylandau
from scipy.optimize import curve_fit
from .decorators import timeit
logging.getLogger("matplotlib").setLevel("WARNING")
logging.getLogger("PIL").setLevel("WARNING")
class Fitting():
"""Class responsible for fitting curves to distributions.
:param hist: Values of the distribution
:type hist: :class:`np.ndarray`
:param edges: Bin edges of the distribution
:type edges: :class:`np.ndarray`
:param func: Function to fit to the distribution.
Currently supported: `gauss`, `langau`
:type func: str
:param params: Intial parameters to start off optimizing at
:type params: set
"""
@timeit("s")
def __init__(self, hist, edges, **kwargs):
"""Constructor method
"""
self.log = logging.getLogger(
"analysis.{0}".format(self.__class__.__name__))
# Get center values of bins
self.bincenters = np.array(0.5 * (edges[1:] + edges[:-1]))
eta = kwargs.get("eta", 1000)
# Get the function
func = kwargs.pop(
"func", pylandau.langau) # pylint: disable=c-extension-no-member
if func == "gauss":
func = gauss
# Get estimations for parameters (auto-find if None)
params = kwargs.pop("params", None)
if params is None:
params = self._guess_params(hist, func, eta)
self._fit(hist, params, func)
def _fit(self, hist, params, func):
"""Utility function fitting an arbitrary curve to a distribution.
:param hist: Values of the distribution
:type hist: :class:`np.ndarray`
:param params: Intial parameters to start off optimizing at
:type params: set
:param func: Function to fit to the distribution.
Currently supported: `gauss`, `langau`
:type func: str
"""
self.coeff, self.pcov = curve_fit(func, self.bincenters, hist, # pylint: disable=unbalanced-tuple-unpacking
p0=params)
self.fig = func(self.bincenters, *self.coeff)
self.log.info("Fit parameters: (%s)", self.coeff)
def _guess_params(self, hist, func, eta):
"""Estimates first fit parameter guesses.
:param hist: Values of the distribution
:type hist: :class:`np.ndarray`
:param func: Function to fit to the distribution.
Currently supported: `gauss`, `langau`
:type func: str
:param eta: Eta value for the Langau function to use.
:type eta: float
:raises NameError: if no valid fit function was provided
:return params: Estimated initial fit parameters.
:rtype: set
"""
if func == pylandau.langau: # pylint: disable=c-extension-no-member
params = self._params_langau(hist, eta)
elif func == gauss: # pylint: disable=comparison-with-callable
params = self._params_gauss(hist)
else:
raise NameError(
f"No appropriate fit function was defined: {func}.")
return params
def _params_langau(self, hist, eta):
"""Estimate initial parameters for a langau distribution.
:param hist: Values of the distribution
:type hist: :class:`np.ndarray`
:param eta: Eta value for the Langau function to use.
:type eta: float
:return params: Estimated initial fit parameters.
:rtype: set
"""
# Estimate MPV by mean
# (width of sensors is sufficiently big enough for a first guess at least)
mpv_guess = np.sum(self.bincenters * hist) / np.sum(hist)
self.log.debug("mpv guess: %s", mpv_guess)
# Estimate σ by FWHM (* some statistic stuff factor)
half_max = (np.max(hist) - np.min(hist)) / 2
extremum = hist.argmax()
nearest_above = np.abs(hist[extremum:-1] - half_max).argmin()
try:
nearest_below = np.abs(hist[0:extremum] - half_max).argmin()
except ValueError:
nearest_below = 0
fwhm = np.mean(self.bincenters[nearest_above + extremum]
) - np.mean(self.bincenters[nearest_below])
self.log.debug("half_max = %s, fwhm = %s",
half_max, fwhm)
# See https://en.wikipedia.org/wiki/Full_width_at_half_maximum
sigma_guess = fwhm / 2.355 # pylint: disable=invalid-name
# Estimate A (maximum of Landau distribution before convolution) by
# the maximum of the distribution
A_guess = np.max(hist) # pylint: disable=invalid-name
self.log.debug("A guess = %s", A_guess)
params = (mpv_guess, eta, sigma_guess, A_guess)
return params
def _params_gauss(self, hist):
"""Estimate initial parameters for a normal distribution.
:param hist: Values of the distribution
:type hist: :class:`np.ndarray`
:return params: Estimated initial fit parameters.
:rtype: set
"""
amp_guess = np.max(hist)
self.log.debug("amp guess: %s", amp_guess)
mean_guess = np.sum(self.bincenters * hist) / np.sum(hist)
self.log.debug("mean guess: %s", mean_guess)
# Estimate σ by FWHM (* some statistic stuff factor)
half_max = (np.max(hist) - np.min(hist)) / 2
extremum = hist.argmax()
nearest_above = np.abs(hist[extremum:-1] - half_max).argmin()
try:
nearest_below = np.abs(hist[0:extremum] - half_max).argmin()
except ValueError:
nearest_below = 0
fwhm = np.mean(self.bincenters[nearest_above + extremum]
) - np.mean(self.bincenters[nearest_below])
self.log.debug("half_max = %s, fwhm = %s",
half_max, fwhm)
# See https://en.wikipedia.org/wiki/Full_width_at_half_maximum
sigma_guess = fwhm / 2.355 # pylint: disable=invalid-name
params = (amp_guess, mean_guess, sigma_guess)
return params
# Several function definitions go here
def example_poly(x, coeff): # pylint: disable=invalid-name
""" No regards for Horner's rule. Everybody gangsta until polynomials are O(n²). """
return np.array([np.sum([c*val**i for i, c in enumerate(coeff)]) for val in x])
def gauss(xdata, amp, mean, sigma):
""" A simple normal distribution """
return amp * np.exp(np.divide(-(xdata - mean)**2, 2. * sigma**2))
"""
The `gain` module
=================
The `gain` module represents a gain scan run. A gain scan (or calibration) is performed to get the
conversion factor of the signal - being natively measured in analog-digital-converter-counts
(ADCs) - to electrons.
By injecting calibration pulses into all channels, their electrical behavior is characterized,
allowing abstract ADC counts to be converted into actual electrons.
"""
# pylint: disable=no-absolute-import
from __future__ import division
import numpy as np
import pandas as pd
from .raw import Raw
from .decorators import timeit