Source code for WaveformConstructor.series
import numpy as np
from .waveform import Off, Zero, WaveformFunctionBase, DigitalWaveformBase
from .default import DefaultValues
from typing import List, Tuple
[docs]class WaveformSeries(object):
WAVEFORM_TO_PAD = Zero()
"""Series of waveforms. Each waveform is defined by a waveform function and a time segment.
Parameters
----------
time_segments: list of float
A list of time segments of the waveforms.
waveform_functions: list of WaveformFunctionBase
A list of waveform functions. The length of the list must be the same as the time_segments.
Notes
-----
The waveform series is defined by a list of time segments and a list of waveform functions.
The waveform series is evaluated by evaluating the waveform functions segment by segment.
The total time of the waveform series is the sum of the time segments.
Raises
------
ValueError
If the length of the time_segments and the waveform_functions are not the same.
ValueError
If the time_segments contains negative values.
"""
def __init__(self, time_segments: List[float], waveform_functions: List[WaveformFunctionBase]):
if len(time_segments) != len(waveform_functions):
raise ValueError("length mismatch")
for t in time_segments:
if t < 0:
raise ValueError("Negative Time Value.")
self.time_segments = time_segments
self.waveform_funcs = waveform_functions
@property
def total_time(self) -> float:
"""Total duration of the waveform series"""
return sum(self.time_segments)
def _zeros_like(self, t: np.ndarray) -> np.ndarray:
"""Make a zero waveform like the given time series.
Parameters
----------
t: np.ndarray
Time series
Returns
-------
v: np.ndarray
Zero waveform
"""
return np.zeros_like(t)
[docs] def waveform(self, sampling_rate: int = None, start_time: float=0.0) -> Tuple[np.ndarray, np.ndarray]:
"""Evaluate the waveforms with a given sampling rate.
Parameters
----------
sampling_rate: float
Sampling rate of the AWG. If not specified, use the DefaultValues.sampling_rate.
start_time: float
Start time of the waveform series. If not specified, use 0.
Returns
-------
t: np.ndarray
Time series
v: np.ndarray
Voltage values
"""
sampling_rate = DefaultValues.sampling_rate if sampling_rate is None else sampling_rate
if not isinstance(sampling_rate, int):
raise TypeError("Must be an integer")
t = self.make_time_series(self.total_time, sampling_rate)
if start_time != 0.0:
t += start_time
v = self._zeros_like(t)
n_segments = len(self.time_segments)
cummulative_time = 0
# Evaluate the waveform function segment by segment
for i in range(n_segments):
cummulative_time += self.time_segments[i]
start_idx = 0 if i == 0 else stop_idx
stop_idx = None if i == n_segments - 1 else int(cummulative_time * sampling_rate) + 1
# If the waveform is not zero length, evaluate the waveform function
if stop_idx != start_idx:
t_segment = t[start_idx:stop_idx]
v[start_idx:stop_idx] = self.waveform_funcs[i](t_segment)
return t, v
[docs] @staticmethod
def make_time_series(t: float, sample_rate: float) -> np.ndarray:
"""Make a time series with a given total time and sampling rate.
Parameters
----------
t: float
Total time of the waveform series.
sample_rate: float
Sampling rate of the AWG.
Returns
-------
t: np.ndarray
Time series
"""
# Round the sample number to 32
n_sample = (t * sample_rate) // 32 * 32
return np.arange(n_sample) / sample_rate
[docs] def append(self, t: float, waveform_function: WaveformFunctionBase):
"""Append a new waveform."""
self.time_segments.append(t)
self.waveform_funcs.append(waveform_function)
[docs] def pad_to(self, new_total_time: float):
"""Pad zeros at the end. If the new total time is smaller than the current total time, raise an error.
Parameters
----------
new_total_time: float
Extend the waveform series to the new time.
Raises
------
ValueError
If the new total time is smaller than the current total time.
"""
total_time = self.total_time
if new_total_time < total_time:
raise ValueError("new_total_time smaller than total_time")
if new_total_time > total_time:
self.append(new_total_time - total_time, self.WAVEFORM_TO_PAD)
def __add__(self, other: 'WaveformSeries') -> 'WaveformSeries':
"""Concatenate two waveform series."""
if not isinstance(other, WaveformSeries):
raise TypeError
return WaveformSeries(
[*self.time_segments, *other.time_segments],
[*self.waveform_funcs, *other.waveform_funcs]
)
def __mul__(self, other: int) -> 'WaveformSeries':
"""Repeat the waveform series."""
if not isinstance(other, int):
raise TypeError
return WaveformSeries(
self.time_segments * other,
self.waveform_funcs * other
)
[docs] def Q(self) -> "WaveformSeries":
return WaveformSeries(self.time_segments, [wf.Q() for wf in self.waveform_funcs])
[docs]class DigitalWaveformSeries(WaveformSeries):
WAVEFORM_TO_PAD = Off()
def __init__(self, time_segments: List[float], waveform_functions: List[DigitalWaveformBase]):
for wf in waveform_functions:
if not isinstance(wf, DigitalWaveformBase):
raise TypeError("Must be a Digital Waveform")
super().__init__(time_segments, waveform_functions)
[docs] def append(self, t: float, waveform_function: DigitalWaveformBase):
"""Append a new waveform.
Parameters
----------
t: float
Time duration of the new waveform.
waveform_function: DigitalWaveformBase
Waveform function of the new waveform.
"""
if not isinstance(waveform_function, DigitalWaveformBase):
raise TypeError("Must be a Digital Waveform")
super().append(t, waveform_function)
def _zeros_like(self, t: np.ndarray) -> np.ndarray:
return np.zeros_like(t, dtype=bool)
def __add__(self, other: 'DigitalWaveformSeries') -> 'DigitalWaveformSeries':
"""Concatenate two waveform series."""
if not isinstance(other, DigitalWaveformSeries):
raise TypeError
return DigitalWaveformSeries(
[*self.time_segments, *other.time_segments],
[*self.waveform_funcs, *other.waveform_funcs]
)
def __mul__(self, other: int) -> 'DigitalWaveformSeries':
"""Repeat the waveform series."""
if not isinstance(other, int):
raise TypeError
return DigitalWaveformSeries(
self.time_segments * other,
self.waveform_funcs * other
)
[docs]class PointSumWaveformSeries:
"""Waveform series by adding the two waveform series point by point.
Parameters
----------
waveform_series1 : WaveformSeries
First waveform series.
waveform_series2 : WaveformSeries
Second waveform series.
Raises
------
ValueError
If the time_segments of the two waveform series are not equal.
"""
def __init__(self, waveform_series1: 'WaveformSeries', waveform_series2: 'WaveformSeries'):
if abs(waveform_series1.total_time - waveform_series2.total_time)>1/DefaultValues.sampling_rate:
raise ValueError("Total time of the two waveform series must be equal.")
self.waveform_series1 = waveform_series1
self.waveform_series2 = waveform_series2
@property
def total_time(self) -> float:
"""Total duration of the waveform series."""
return self.waveform_series1.total_time
[docs] def waveform(self, sampling_rate: int = None, start_time: float = 0.0) -> Tuple[np.ndarray, np.ndarray]:
"""Evaluate the waveforms with a given sampling rate.
Parameters
----------
sampling_rate : int, optional
Sampling rate of the AWG. If not specified, use the DefaultValues.sampling_rate.
start_time : float, optional
Start time of the waveform series. If not specified, use 0.
Returns
-------
t : np.ndarray
Time series
v : np.ndarray
Voltage values
"""
t1, v1 = self.waveform_series1.waveform(sampling_rate, start_time)
t2, v2 = self.waveform_series2.waveform(sampling_rate, start_time)
if not np.array_equal(t1, t2):
raise ValueError("Time series of the two waveform series must be equal.")
return t1, v1 + v2
[docs] @staticmethod
def maketimeseries( t: float, sample_rate: float) -> np.ndarray:
"""Make a time series with a given total time and sampling rate.
Parameters
----------
t : float
Total time of the waveform series.
sample_rate : float
Sampling rate of the AWG.
Returns
-------
t : np.ndarray
Time series
"""
return WaveformSeries.make_time_series(t, sample_rate)
[docs] def pad_to(self, new_total_time: float):
"""Pad zeros at the end. If the new total time is smaller than the current total time, raise an error.
Parameters
----------
new_total_time : float
Extend the waveform series to the new time.
Raises
------
ValueError
If the new total time is smaller than the current total time.
"""
total_time = self.total_time
if new_total_time < total_time:
raise ValueError("new_total_time smaller than total_time")
self.waveform_series1.pad_to(new_total_time)
self.waveform_series2.pad_to(new_total_time)