Source code for WaveformConstructor.util

import numpy as np
import numba as nb
from typing import List, Tuple, Callable
from concurrent.futures import ThreadPoolExecutor

from .series import WaveformSeries, DigitalWaveformSeries, DigitalWaveformBase
from .waveform import WaveformFunctionBase, Zero, Off
from .default import DefaultValues


[docs]@nb.jit(nopython=True, nogil=True) def to_int16(v_float: np.ndarray) -> np.ndarray: """Convert the waveform array to int16. Parameters ---------- v_float: np.ndarray Returns ------- v_int16: np.ndarray """ return (v_float * (2 ** 15 - 1)).astype(np.int16)
[docs]def sync(*waveform_series_collection: WaveformSeries) -> float: """Pad the waveform series to the time of the longest waveform series. Parameters ---------- waveform_series_collection: list of WaveformSeries A list of waveform series. Returns ------- time: float The time after synchronization. """ max_time = max([waveform_series.total_time for waveform_series in waveform_series_collection]) for waveform_series in waveform_series_collection: waveform_series.pad_to(max_time) return max_time
[docs]class MultiChannelComposer(object): """A helper class for composing multi-channel waveforms that requires synchronization. Parameters ---------- n_channel: int Number of channels. Attributes ---------- channel: list of WaveformSeries A list of waveform series for each channel. """ def __init__(self, n_channel: int, digital=[]) -> None: self.channel = [DigitalWaveformSeries([],[]) if i in digital else WaveformSeries([],[]) for i in range(n_channel)]
[docs] def sync(self) -> float: """Synchronize the waveforms in all channels. The waveforms in all channels will be padded to the time of the longest waveform series. Returns ------- time: float The time after synchronization. """ return sync(*self.channel)
[docs] def append_series(self, channel: int, waveform_series: WaveformSeries, sync_after: bool = False): self.channel[channel] = self.channel[channel] + waveform_series if sync_after: self.sync()
[docs] def append_waveform(self, channel: int, waveform: WaveformFunctionBase, time: float, sync_after: bool = False): to_append = DigitalWaveformSeries([time], [waveform]) if isinstance(self.channel[channel], DigitalWaveformSeries) else WaveformSeries([time], [waveform]) self.channel[channel] = self.channel[channel] + to_append if sync_after: self.sync()
[docs] def mute(self, channel: int, time: float, sync_after: bool = False): wavefrom = Off() if isinstance(self.channel[channel], DigitalWaveformSeries) else Zero() self.append_waveform(channel, wavefrom, time, sync_after)
[docs] def mute_all(self, time: float): for channel in range(len(self.channel)): self.mute(channel, time)
[docs]class WaveformFactory(object): """Tools for evaluating waveforms with multi-threading. Parameters ---------- waveform_series_collection: list of WaveformConstructor.series.WaveformSeries A list of waveform series. sampling_rate: int, Optional Sampling rate of the AWG. If not specified, use the DefaultValues.sampling_rate. """ def __init__(self, waveform_series_collection: List[WaveformSeries], sampling_rate=None, start_time=0.0): if sampling_rate is None: sampling_rate = DefaultValues.sampling_rate self.sampling_rate=sampling_rate self.waveform_series_collection=waveform_series_collection self.start_time=start_time def _process(self, process_input: Tuple[int, WaveformSeries]): index, waveform_series = process_input waveform_array = waveform_series.waveform(self.sampling_rate, self.start_time)[1] return index, to_int16(waveform_array)
[docs] def pipeline(self, max_workers=None): """Evaluate the waveforms with multi-threading. Returns ------- generator A generator of the waveform index and the waveform array. """ executor = ThreadPoolExecutor(max_workers=max_workers) return executor.map(self._process, enumerate(self.waveform_series_collection))
[docs]class WaveformFactoryDigital(WaveformFactory): def _process(self, process_input: Tuple[int, WaveformSeries]): index, waveform_series = process_input waveform_array = waveform_series.waveform(self.sampling_rate, self.start_time)[1] if(waveform_array.dtype != bool): raise TypeError("The waveform series must be digital.") return index, waveform_array
[docs]class WaveformFactoryIQ(WaveformFactory): """Tools for evaluating IQ waveforms stream with multi-threading.""" def __init__(self, waveform_series_collection: List[WaveformSeries], sampling_rate=None, start_time=0.0, q_delay=0.0): super().__init__(waveform_series_collection, sampling_rate, start_time) self.q_delay = q_delay def _process(self, process_input: Tuple[int, WaveformSeries]): index, waveform_series = process_input waveform_array_I = waveform_series.waveform(self.sampling_rate, self.start_time)[1] waveform_array_Q = waveform_series.Q().waveform(self.sampling_rate, self.start_time + self.q_delay)[1] return index, to_int16(waveform_array_I), to_int16(waveform_array_Q)