Source code for dwfpy.analog_recorder

"""
Recorder for Analog Input data.
"""

#
# This file is part of dwfpy: https://github.com/mariusgreuel/dwfpy
# Copyright (C) 2019 Marius Greuel
#
# SPDX-License-Identifier: MIT
#

import ctypes
from typing import Callable, Tuple, List, Optional
import numpy as np
from . import bindings as api
from . import analog_input as fwd  # pylint: disable=unused-import
from .constants import Status


[docs]class AnalogRecorder: """Recorder for Analog Input data"""
[docs] class ChannelData: """Represents the acquired data of a channel.""" def __init__(self): self._data_samples = None @property def data_samples(self) -> tuple: """Gets the acquired data samples.""" return self._data_samples
def __init__(self, module: 'fwd.AnalogInput'): self._module = module self._is_setup = False self._buffer_size = 0 self._buffer_index = 0 self._data_buffers: List[Optional[ctypes.Array]] = [] self._status = Status.READY self._requested_samples = 0 self._total_samples = 0 self._lost_samples = 0 self._corrupted_samples = 0 self._channels = tuple(self.ChannelData() for _ in module.channels) @property def status(self) -> Status: """Gets the last acquisition status.""" return self._status @property def requested_samples(self) -> int: """Gets the number of requested samples for recording.""" return self._requested_samples @property def total_samples(self) -> int: """Gets the total number of acquired and lost samples.""" return self._total_samples @property def lost_samples(self) -> int: """Gets the number of lost samples.""" return self._lost_samples @property def corrupted_samples(self) -> int: """Gets the number of corrupted samples.""" return self._corrupted_samples @property def channels(self) -> Tuple[ChannelData, ...]: """Gets a collection of data channels.""" return self._channels
[docs] def record(self, callback: Optional[Callable[['AnalogRecorder'], bool]] = None) -> None: """Starts the recording and processes all samples until the recording is complete. Parameters ---------- callback : function A user-defined function that is called every time a data chunk is processed. Return True to continue recording, False to abort the recording. Notes ----- This function blocks until the recording is complete. """ if not self._is_setup: self._setup_recording() if self._is_setup: self._module.configure(start=True) if self._is_setup: while True: again_status = self._process_recording() again_user = callback(self) if callback is not None else True if not again_status or not again_user: break self._finalize_recording()
[docs] def process(self) -> bool: """Checks the instrument status and processes a chunk of data if available. Returns ------- bool If True, then if there is more data to process, and the function must be called again. If False, the recording is complete, and you must stop calling this function. Notes ----- This function must be called repeatedly by the user to process the recording data. Failure to call this function in a timely manner will cause samples to get lost or corrupted. """ if not self._is_setup: self._setup_recording() if self._is_setup: self._module.configure(start=True) if self._is_setup: again = self._process_recording() if not again: self._finalize_recording() return again return False
def _setup_recording(self) -> None: self._channels = tuple(self.ChannelData() for _ in self._module.channels) self._buffer_size = round(self._module.record_length * self._module.frequency) if self._buffer_size > 0: self._requested_samples = self._buffer_size self._buffer_index = 0 self._data_buffers = [] for channel in self._module.channels: self._data_buffers.append( (ctypes.c_double * self._buffer_size)() if channel.enabled else None ) self._is_setup = True def _process_recording(self) -> bool: self._status = self._module.read_status(read_data=True) available_samples, lost_samples, corrupted_samples = self._module.record_status self._buffer_index += lost_samples self._buffer_index %= self._buffer_size self._total_samples += lost_samples self._total_samples += available_samples self._lost_samples += lost_samples self._corrupted_samples += corrupted_samples sample_index = 0 while available_samples > 0: chunk_size = available_samples if self._buffer_index + chunk_size > self._buffer_size: chunk_size = self._buffer_size - self._buffer_index for i, data_buffer in enumerate(self._data_buffers): if data_buffer is not None: api.dwf_analog_in_status_data2( self._module.device.handle, i, self._byref_double(data_buffer, self._buffer_index), sample_index, chunk_size, ) self._buffer_index += chunk_size self._buffer_index %= self._buffer_size sample_index += chunk_size available_samples -= chunk_size return self._status != Status.DONE def _finalize_recording(self) -> None: for i, data_buffer in enumerate(self._data_buffers): if data_buffer is not None: # pylint: disable-next=protected-access self._channels[i]._data_samples = self._normalize_ring_buffer( data_buffer, self._buffer_index ) self._is_setup = False @staticmethod def _byref_double(c_buffer, index): pointer = ctypes.byref(c_buffer, index * ctypes.sizeof(ctypes.c_double)) return ctypes.cast(pointer, ctypes.POINTER(ctypes.c_double)) @staticmethod def _normalize_ring_buffer(buffer: ctypes.Array, index: int): array = np.array(buffer) return array if index == 0 else np.concatenate([array[index:], array[:index]])