"""
Recorder for Digital Input data.
"""
#
# This file is part of dwfpy: https://github.com/mariusgreuel/dwfpy
# Copyright (C) 2019 Marius Greuel
#
# SPDX-License-Identifier: MIT
#
import ctypes
from typing import Callable, Optional
import numpy as np
from . import bindings as api
from . import digital_input as fwd # pylint: disable=unused-import
from .constants import DigitalInputSampleMode, Status
[docs]class DigitalRecorder:
"""Recorder for Digital Input data"""
def __init__(self, module: 'fwd.DigitalInput'):
self._module = module
self._is_setup = False
self._sample_size = 0
self._buffer_size = 0
self._buffer_index = 0
self._data_buffer: Optional[ctypes.Array] = None
self._noise_buffer: Optional[ctypes.Array] = None
self._status = Status.READY
self._acquire_noise = False
self._requested_samples = 0
self._total_samples = 0
self._lost_samples = 0
self._corrupted_samples = 0
self._data_samples: tuple = ()
self._noise_samples: tuple = ()
@property
def status(self) -> Status:
"""Gets the last acquisition status."""
return self._status
@property
def requested_samples(self) -> int:
"""Gets the number of requested samples for recording."""
return self._requested_samples
@property
def total_samples(self) -> int:
"""Gets the total number of acquired and lost samples."""
return self._total_samples
@property
def lost_samples(self) -> int:
"""Gets the number of lost samples."""
return self._lost_samples
@property
def corrupted_samples(self) -> int:
"""Gets the number of corrupted samples."""
return self._corrupted_samples
@property
def data_samples(self) -> tuple:
"""Gets the acquired data samples."""
return self._data_samples
@property
def noise_samples(self) -> tuple:
"""Gets the acquired noise samples."""
return self._noise_samples
[docs] def record(self, callback: Optional[Callable[['DigitalRecorder'], bool]] = None) -> None:
"""Starts the recording and processes all samples until the recording is complete.
Parameters
----------
callback : function
A user-defined function that is called every time a data chunk is processed.
Return True to continue recording, False to abort the recording.
Notes
-----
This function blocks until the recording is complete.
"""
if not self._is_setup:
self._setup_recording()
if self._is_setup:
self._module.configure(start=True)
if self._is_setup:
while True:
again_status = self._process_recording()
again_user = callback(self) if callback is not None else True
if not again_status or not again_user:
break
self._finalize_recording()
[docs] def process(self) -> bool:
"""Checks the instrument status and processes a chunk of data if available.
Returns
-------
bool
If True, then if there is more data to process, and the function must be called again.
If False, the recording is complete, and you must stop calling this function.
Notes
-----
This function must be called repeatedly by the user to process the recording data.
Failure to call this function in a timely manner
will cause samples to get lost or corrupted.
"""
if not self._is_setup:
self._setup_recording()
if self._is_setup:
self._module.configure(start=True)
if self._is_setup:
again = self._process_recording()
if not again:
self._finalize_recording()
return again
return False
def _setup_recording(self) -> None:
self._acquire_noise = self._module.sample_mode == DigitalInputSampleMode.NOISE
self._requested_samples = 0
self._data_samples = ()
self._noise_samples = ()
self._buffer_size = self._module.trigger.prefill + self._module.trigger.position
if self._buffer_size > 0:
self._requested_samples = self._buffer_size
self._buffer_index = 0
if self._module.sample_format <= 8:
self._sample_size = ctypes.sizeof(ctypes.c_uint8)
self._data_buffer = (ctypes.c_uint8 * self._buffer_size)()
self._noise_buffer = (
(ctypes.c_uint8 * self._buffer_size)() if self._acquire_noise else None
)
elif self._module.sample_format <= 16:
self._sample_size = ctypes.sizeof(ctypes.c_uint16)
self._data_buffer = (ctypes.c_uint16 * self._buffer_size)()
self._noise_buffer = (
(ctypes.c_uint16 * self._buffer_size)() if self._acquire_noise else None
)
elif self._module.sample_format <= 32:
self._sample_size = ctypes.sizeof(ctypes.c_uint32)
self._data_buffer = (ctypes.c_uint32 * self._buffer_size)()
self._noise_buffer = (
(ctypes.c_uint32 * self._buffer_size)() if self._acquire_noise else None
)
else:
raise ValueError('sample_format must be 8, 16, or 32.')
self._is_setup = True
def _process_recording(self) -> bool:
self._status = self._module.read_status(read_data=True)
available_samples, lost_samples, corrupted_samples = self._module.record_status
self._buffer_index += lost_samples
self._buffer_index %= self._buffer_size
self._total_samples += lost_samples
self._total_samples += available_samples
self._lost_samples += lost_samples
self._corrupted_samples += corrupted_samples
sample_index = 0
while available_samples > 0:
chunk_size = available_samples
if self._buffer_index + chunk_size > self._buffer_size:
chunk_size = self._buffer_size - self._buffer_index
if self._data_buffer is not None:
api.dwf_digital_in_status_data2(
self._module.device.handle,
ctypes.byref(self._data_buffer, self._buffer_index * self._sample_size),
sample_index,
chunk_size * self._sample_size,
)
if self._noise_buffer is not None:
api.dwf_digital_in_status_noise2(
self._module.device.handle,
ctypes.byref(self._noise_buffer, self._buffer_index * self._sample_size),
sample_index,
chunk_size * self._sample_size,
)
self._buffer_index += chunk_size
self._buffer_index %= self._buffer_size
sample_index += chunk_size
available_samples -= chunk_size
return self._status != Status.DONE
def _finalize_recording(self) -> None:
if self._data_buffer is not None:
self._data_samples = self._normalize_ring_buffer(self._data_buffer, self._buffer_index)
self._data_buffer = None
if self._noise_buffer is not None:
self._noise_samples = self._normalize_ring_buffer(
self._noise_buffer, self._buffer_index
)
self._noise_buffer = None
self._is_setup = False
@staticmethod
def _normalize_ring_buffer(buffer: ctypes.Array, index: int):
array = np.array(buffer)
return array if index == 0 else np.concatenate([array[index:], array[:index]])
[docs] def stream(self, callback: Callable[['DigitalRecorder'], bool]) -> None:
"""Starts the streaming.
Parameters
----------
callback : function
A user-defined function that is called every time a data chunk is processed.
Return True to continue streaming, False to stop the streaming.
"""
if not self._is_setup:
self._setup_streaming()
if self._is_setup:
self._module.configure(start=True)
if self._is_setup:
while True:
again_status = self._update_streaming()
again_user = callback(self)
if not again_status or not again_user:
break
self._finalize_streaming()
def _setup_streaming(self) -> None:
self._acquire_noise = self._module.sample_mode == DigitalInputSampleMode.NOISE
self._requested_samples = 0
self._buffer_size = 0
self._data_samples = ()
self._noise_samples = ()
self._is_setup = True
def _update_streaming(self) -> bool:
self._status = self._module.read_status(read_data=True)
available_samples, lost_samples, corrupted_samples = self._module.record_status
self._total_samples += lost_samples
self._total_samples += available_samples
self._lost_samples += lost_samples
self._corrupted_samples += corrupted_samples
self._data_samples = self._module.get_data(sample_count=available_samples)
if self._acquire_noise:
self._noise_samples = self._module.get_noise(sample_count=available_samples)
return self._status != Status.DONE
def _finalize_streaming(self) -> None:
self._status = Status.DONE
self._is_setup = False