Source code for WaveformConstructor.series

import numpy as np
from .waveform import Off, Zero, WaveformFunctionBase, DigitalWaveformBase
from .default import DefaultValues
from typing import List, Tuple


[docs]class WaveformSeries(object): WAVEFORM_TO_PAD = Zero() """Series of waveforms. Each waveform is defined by a waveform function and a time segment. Parameters ---------- time_segments: list of float A list of time segments of the waveforms. waveform_functions: list of WaveformFunctionBase A list of waveform functions. The length of the list must be the same as the time_segments. Notes ----- The waveform series is defined by a list of time segments and a list of waveform functions. The waveform series is evaluated by evaluating the waveform functions segment by segment. The total time of the waveform series is the sum of the time segments. Raises ------ ValueError If the length of the time_segments and the waveform_functions are not the same. ValueError If the time_segments contains negative values. """ def __init__(self, time_segments: List[float], waveform_functions: List[WaveformFunctionBase]): if len(time_segments) != len(waveform_functions): raise ValueError("length mismatch") for t in time_segments: if t < 0: raise ValueError("Negative Time Value.") self.time_segments = time_segments self.waveform_funcs = waveform_functions @property def total_time(self) -> float: """Total duration of the waveform series""" return sum(self.time_segments) def _zeros_like(self, t: np.ndarray) -> np.ndarray: """Make a zero waveform like the given time series. Parameters ---------- t: np.ndarray Time series Returns ------- v: np.ndarray Zero waveform """ return np.zeros_like(t)
[docs] def waveform(self, sampling_rate: int = None, start_time: float=0.0) -> Tuple[np.ndarray, np.ndarray]: """Evaluate the waveforms with a given sampling rate. Parameters ---------- sampling_rate: float Sampling rate of the AWG. If not specified, use the DefaultValues.sampling_rate. start_time: float Start time of the waveform series. If not specified, use 0. Returns ------- t: np.ndarray Time series v: np.ndarray Voltage values """ sampling_rate = DefaultValues.sampling_rate if sampling_rate is None else sampling_rate if not isinstance(sampling_rate, int): raise TypeError("Must be an integer") t = self.make_time_series(self.total_time, sampling_rate) if start_time != 0.0: t += start_time v = self._zeros_like(t) n_segments = len(self.time_segments) cummulative_time = 0 # Evaluate the waveform function segment by segment for i in range(n_segments): cummulative_time += self.time_segments[i] start_idx = 0 if i == 0 else stop_idx stop_idx = None if i == n_segments - 1 else int(cummulative_time * sampling_rate) + 1 # If the waveform is not zero length, evaluate the waveform function if stop_idx != start_idx: t_segment = t[start_idx:stop_idx] v[start_idx:stop_idx] = self.waveform_funcs[i](t_segment) return t, v
[docs] @staticmethod def make_time_series(t: float, sample_rate: float) -> np.ndarray: """Make a time series with a given total time and sampling rate. Parameters ---------- t: float Total time of the waveform series. sample_rate: float Sampling rate of the AWG. Returns ------- t: np.ndarray Time series """ # Round the sample number to 32 n_sample = (t * sample_rate) // 32 * 32 return np.arange(n_sample) / sample_rate
[docs] def append(self, t: float, waveform_function: WaveformFunctionBase): """Append a new waveform.""" self.time_segments.append(t) self.waveform_funcs.append(waveform_function)
[docs] def pad_to(self, new_total_time: float): """Pad zeros at the end. If the new total time is smaller than the current total time, raise an error. Parameters ---------- new_total_time: float Extend the waveform series to the new time. Raises ------ ValueError If the new total time is smaller than the current total time. """ total_time = self.total_time if new_total_time < total_time: raise ValueError("new_total_time smaller than total_time") if new_total_time > total_time: self.append(new_total_time - total_time, self.WAVEFORM_TO_PAD)
def __add__(self, other: 'WaveformSeries') -> 'WaveformSeries': """Concatenate two waveform series.""" if not isinstance(other, WaveformSeries): raise TypeError return WaveformSeries( [*self.time_segments, *other.time_segments], [*self.waveform_funcs, *other.waveform_funcs] ) def __mul__(self, other: int) -> 'WaveformSeries': """Repeat the waveform series.""" if not isinstance(other, int): raise TypeError return WaveformSeries( self.time_segments * other, self.waveform_funcs * other )
[docs] def Q(self) -> "WaveformSeries": return WaveformSeries(self.time_segments, [wf.Q() for wf in self.waveform_funcs])
[docs]class DigitalWaveformSeries(WaveformSeries): WAVEFORM_TO_PAD = Off() def __init__(self, time_segments: List[float], waveform_functions: List[DigitalWaveformBase]): for wf in waveform_functions: if not isinstance(wf, DigitalWaveformBase): raise TypeError("Must be a Digital Waveform") super().__init__(time_segments, waveform_functions)
[docs] def append(self, t: float, waveform_function: DigitalWaveformBase): """Append a new waveform. Parameters ---------- t: float Time duration of the new waveform. waveform_function: DigitalWaveformBase Waveform function of the new waveform. """ if not isinstance(waveform_function, DigitalWaveformBase): raise TypeError("Must be a Digital Waveform") super().append(t, waveform_function)
def _zeros_like(self, t: np.ndarray) -> np.ndarray: return np.zeros_like(t, dtype=bool) def __add__(self, other: 'DigitalWaveformSeries') -> 'DigitalWaveformSeries': """Concatenate two waveform series.""" if not isinstance(other, DigitalWaveformSeries): raise TypeError return DigitalWaveformSeries( [*self.time_segments, *other.time_segments], [*self.waveform_funcs, *other.waveform_funcs] ) def __mul__(self, other: int) -> 'DigitalWaveformSeries': """Repeat the waveform series.""" if not isinstance(other, int): raise TypeError return DigitalWaveformSeries( self.time_segments * other, self.waveform_funcs * other )
[docs]class PointSumWaveformSeries: """Waveform series by adding the two waveform series point by point. Parameters ---------- waveform_series1 : WaveformSeries First waveform series. waveform_series2 : WaveformSeries Second waveform series. Raises ------ ValueError If the time_segments of the two waveform series are not equal. """ def __init__(self, waveform_series1: 'WaveformSeries', waveform_series2: 'WaveformSeries'): if abs(waveform_series1.total_time - waveform_series2.total_time)>1/DefaultValues.sampling_rate: raise ValueError("Total time of the two waveform series must be equal.") self.waveform_series1 = waveform_series1 self.waveform_series2 = waveform_series2 @property def total_time(self) -> float: """Total duration of the waveform series.""" return self.waveform_series1.total_time
[docs] def waveform(self, sampling_rate: int = None, start_time: float = 0.0) -> Tuple[np.ndarray, np.ndarray]: """Evaluate the waveforms with a given sampling rate. Parameters ---------- sampling_rate : int, optional Sampling rate of the AWG. If not specified, use the DefaultValues.sampling_rate. start_time : float, optional Start time of the waveform series. If not specified, use 0. Returns ------- t : np.ndarray Time series v : np.ndarray Voltage values """ t1, v1 = self.waveform_series1.waveform(sampling_rate, start_time) t2, v2 = self.waveform_series2.waveform(sampling_rate, start_time) if not np.array_equal(t1, t2): raise ValueError("Time series of the two waveform series must be equal.") return t1, v1 + v2
[docs] @staticmethod def maketimeseries( t: float, sample_rate: float) -> np.ndarray: """Make a time series with a given total time and sampling rate. Parameters ---------- t : float Total time of the waveform series. sample_rate : float Sampling rate of the AWG. Returns ------- t : np.ndarray Time series """ return WaveformSeries.make_time_series(t, sample_rate)
[docs] def pad_to(self, new_total_time: float): """Pad zeros at the end. If the new total time is smaller than the current total time, raise an error. Parameters ---------- new_total_time : float Extend the waveform series to the new time. Raises ------ ValueError If the new total time is smaller than the current total time. """ total_time = self.total_time if new_total_time < total_time: raise ValueError("new_total_time smaller than total_time") self.waveform_series1.pad_to(new_total_time) self.waveform_series2.pad_to(new_total_time)