Source code for dwfpy.protocols

"""
Protocols module for Digilent WaveForms devices.
"""

#
# This file is part of dwfpy: https://github.com/mariusgreuel/dwfpy
# Copyright (C) 2019 Marius Greuel
#
# SPDX-License-Identifier: MIT
#

import array
import ctypes
from typing import Optional, Tuple, Union
from . import bindings as api
from .constants import DigitalOutputIdle
from .helpers import Helpers


[docs]class Protocols: """Digital Protocols module."""
[docs] class Uart: """UART protocol.""" def __init__(self, device): self._device = device self._pin_tx = None self._pin_rx = None self._rate = None self._data_bits = None self._stop_bits = None self._parity = None self._inverted = None @property def pin_rx(self) -> Optional[int]: """Gets or sets the DIO channel to use for reception.""" return self._pin_rx @pin_rx.setter def pin_rx(self, value: int) -> None: self._pin_rx = value api.dwf_digital_uart_rx_set(self._device.handle, value) @property def pin_tx(self) -> Optional[int]: """Gets or sets the DIO channel to use for transmission.""" return self._pin_tx @pin_tx.setter def pin_tx(self, value: int) -> None: self._pin_tx = value api.dwf_digital_uart_tx_set(self._device.handle, value) @property def rate(self) -> float: """Gets or sets the baud rate.""" return self._rate @rate.setter def rate(self, value: float) -> None: self._rate = value api.dwf_digital_uart_rate_set(self._device.handle, value) @property def data_bits(self) -> Optional[int]: """Gets or sets the number of data bits.""" return self._data_bits @data_bits.setter def data_bits(self, value: int) -> None: self._data_bits = value api.dwf_digital_uart_bits_set(self._device.handle, value) @property def stop_bits(self) -> float: """Gets or sets the number of stop bits.""" return self._stop_bits @stop_bits.setter def stop_bits(self, value: float) -> None: self._stop_bits = value api.dwf_digital_uart_stop_set(self._device.handle, value) @property def parity(self) -> str: """Gets or sets the parity.""" return self._parity @parity.setter def parity(self, value: str) -> None: self._parity = value api.dwf_digital_uart_parity_set(self._device.handle, self._map_parity(value)) @property def inverted(self) -> bool: """Gets or sets the polarity.""" return self._inverted @inverted.setter def inverted(self, value: bool) -> None: self._inverted = value api.dwf_digital_uart_polarity_set(self._device.handle, value)
[docs] def reset(self) -> None: """Resets the UART configuration to default value.""" api.dwf_digital_uart_reset(self._device.handle) self._pin_tx = None self._pin_rx = None self._rate = None self._data_bits = None self._stop_bits = None self._parity = None self._inverted = None
[docs] def setup( self, pin_rx: Optional[int] = None, pin_tx: Optional[int] = None, rate: int = 9600, data_bits: int = 8, stop_bits: int = 1, parity: str = 'n', inverted: bool = False, ) -> None: """Sets up the UART configuration.""" if pin_rx is not None: self.pin_rx = pin_rx if pin_tx is not None: self.pin_tx = pin_tx if rate is not None: self.rate = rate if data_bits is not None: self.data_bits = data_bits if stop_bits is not None: self.stop_bits = stop_bits if parity is not None: self.parity = parity if inverted is not None: self.inverted = inverted if pin_rx is not None: api.dwf_digital_uart_rx(self._device.handle, None, 0) if pin_tx is not None: api.dwf_digital_uart_tx(self._device.handle, None, 0)
[docs] def read(self, buffer_size=8192) -> Tuple[bytes, int]: """Returns the received characters since the last call. Returns (rx_buffer, parity).""" rx_buffer8 = (ctypes.c_char * buffer_size)() count, parity = api.dwf_digital_uart_rx( self._device.handle, rx_buffer8, len(rx_buffer8) ) return bytes(rx_buffer8)[:count], parity
[docs] def write(self, buffer: bytes) -> None: """Transmits the specified characters.""" tx_buffer8 = (ctypes.c_char * len(buffer)).from_buffer_copy(buffer) api.dwf_digital_uart_tx(self._device.handle, tx_buffer8, len(tx_buffer8))
@staticmethod def _map_parity(value) -> int: named_values = { 'n': 0, 'no': 0, 'o': 1, 'odd': 1, 'e': 2, 'even': 2, } return Helpers.map_named_value(value, named_values)
[docs] class Spi: """SPI protocol.""" def __init__(self, device): self._device = device self._dq_mode = 1 # MOSI/MISO self._pin_clock = None self._pin_select = None self._pin_dq0 = None self._pin_dq1 = None self._pin_dq2 = None self._pin_dq3 = None self._frequency = None self._mode = None self._msb_first = None @property def pin_clock(self) -> int: """Gets or sets the DIO channel to use for SPI clock.""" return self._pin_clock @pin_clock.setter def pin_clock(self, value: int) -> None: self._pin_clock = value api.dwf_digital_spi_clock_set(self._device.handle, value) @property def pin_select(self) -> int: """Gets or sets the DIO channel to use for SPI clock.""" return self._pin_select @pin_select.setter def pin_select(self, value: int) -> None: self._pin_select = value @property def pin_dq0(self) -> int: """Gets or sets the DIO channel to use for SPI data.""" return self._pin_dq0 @pin_dq0.setter def pin_dq0(self, value: int) -> None: self._pin_dq0 = value api.dwf_digital_spi_data_set(self._device.handle, 0, value) @property def pin_dq1(self) -> int: """Gets or sets the DIO channel to use for SPI data.""" return self._pin_dq1 @pin_dq1.setter def pin_dq1(self, value: int) -> None: self._pin_dq1 = value api.dwf_digital_spi_data_set(self._device.handle, 1, value) @property def pin_dq2(self) -> int: """Gets or sets the DIO channel to use for SPI data.""" return self._pin_dq2 @pin_dq2.setter def pin_dq2(self, value: int) -> None: self._pin_dq2 = value api.dwf_digital_spi_data_set(self._device.handle, 2, value) @property def pin_dq3(self) -> int: """Gets or sets the DIO channel to use for SPI data.""" return self._pin_dq3 @pin_dq3.setter def pin_dq3(self, value: int) -> None: self._pin_dq3 = value api.dwf_digital_spi_data_set(self._device.handle, 3, value) @property def frequency(self) -> float: """Gets or sets the DIO channel to use for SPI data.""" return self._frequency @frequency.setter def frequency(self, value: float) -> None: self._frequency = value api.dwf_digital_spi_frequency_set(self._device.handle, value) @property def mode(self) -> int: """Gets or sets the SPI mode.""" return self._mode @mode.setter def mode(self, value: int) -> None: self._mode = value api.dwf_digital_spi_mode_set(self._device.handle, value) @property def msb_first(self) -> bool: """Gets or sets the bit order for SPI data.""" return self._msb_first @msb_first.setter def msb_first(self, value: bool) -> None: self._msb_first = value api.dwf_digital_spi_order_set(self._device.handle, value)
[docs] def reset(self) -> None: """Resets the SPI configuration to default value.""" api.dwf_digital_spi_reset(self._device.handle) self._pin_clock = None self._pin_select = None self._pin_dq0 = None self._pin_dq1 = None self._pin_dq2 = None self._pin_dq3 = None self._frequency = 1000.0 self._mode = None self._msb_first = None
[docs] def setup( self, pin_clock: int, pin_mosi: int, pin_miso: Optional[int] = None, pin_select: Optional[int] = None, frequency: Optional[float] = None, mode: int = 0, msb_first: bool = True, ) -> None: """Sets up the SPI pin configuration in standard mode.""" self._dq_mode = 1 # MOSI/MISO self.pin_clock = pin_clock if pin_mosi is not None: self.pin_dq0 = pin_mosi if pin_miso is not None: self.pin_dq1 = pin_miso if pin_select is not None: self.pin_select = pin_select self._setup_config(frequency, mode, msb_first)
[docs] def setup_three_wire( self, pin_clock: int, pin_siso: int, pin_select: int = None, frequency: float = None, mode: int = 0, msb_first: bool = True, ) -> None: """Sets up the SPI pin configuration in Three-wire mode.""" self._dq_mode = 0 # SISO self.pin_clock = pin_clock self.pin_dq0 = pin_siso if pin_select is not None: self.pin_select = pin_select self._setup_config(frequency, mode, msb_first)
[docs] def setup_dual( self, pin_clock: int, pin_dq0: int, pin_dq1: int, pin_select: int = None, frequency: float = None, mode: int = 0, msb_first: bool = True, ) -> None: """Sets up the SPI pin configuration in Dual mode.""" self._dq_mode = 2 # DUAL self.pin_clock = pin_clock if pin_dq0 is not None: self.pin_dq0 = pin_dq0 if pin_dq1 is not None: self.pin_dq1 = pin_dq1 if pin_select is not None: self.pin_select = pin_select self._setup_config(frequency, mode, msb_first)
[docs] def setup_quad( self, pin_clock: int, pin_dq0: int, pin_dq1: int, pin_dq2: int, pin_dq3: int, pin_select: int = None, frequency: float = None, mode: int = 0, msb_first: bool = True, ) -> None: """Sets up the SPI pin configuration in Quad mode.""" self._dq_mode = 3 # QUAD self.pin_clock = pin_clock if pin_dq0 is not None: self.pin_dq0 = pin_dq0 if pin_dq1 is not None: self.pin_dq1 = pin_dq1 if pin_dq2 is not None: self.pin_dq2 = pin_dq2 if pin_dq3 is not None: self.pin_dq3 = pin_dq3 if pin_select is not None: self.pin_select = pin_select self._setup_config(frequency, mode, msb_first)
def _setup_config(self, frequency, mode, msb_first) -> None: if frequency is not None: self.frequency = frequency if mode is not None: self.mode = mode if msb_first is not None: self.msb_first = msb_first
[docs] def set_idle(self, pin: int, idle: DigitalOutputIdle) -> None: """Specifies the DQ signal idle output state. DQ2 and DQ3 may be used for alternative purpose like for write protect (should be driven low) or for hold (should be in high impendance).""" api.dwf_digital_spi_idle_set(self._device.handle, pin, idle)
[docs] def select(self, level: Union[str, int], pin_select: Optional[int] = None) -> None: """Control the SPI chip select.""" if pin_select is None: pin_select = self.pin_select api.dwf_digital_spi_select( self._device.handle, pin_select, self._map_select_level(level) )
[docs] def read_one(self, dq_mode: Optional[int] = None, bits_per_word: int = 8) -> None: """Performs a SPI reception of up to 32 bits.""" if dq_mode is None: dq_mode = self._dq_mode return api.dwf_digital_spi_read_one(self._device.handle, dq_mode, bits_per_word)
[docs] def write_one( self, data: int, dq_mode: Optional[int] = None, bits_per_word: int = 8 ) -> None: """Performs a SPI transmit of up to 32 bits.""" if dq_mode is None: dq_mode = self._dq_mode return api.dwf_digital_spi_write_one(self._device.handle, dq_mode, bits_per_word, data)
[docs] def read( self, words_to_receive: int, dq_mode: Optional[int] = None, bits_per_word: int = 8 ) -> Union[bytes, array.array]: """Performs a SPI read.""" if dq_mode is None: dq_mode = self._dq_mode if bits_per_word <= 8: rx_buffer8 = (ctypes.c_ubyte * words_to_receive)() api.dwf_digital_spi_read( self._device.handle, dq_mode, bits_per_word, rx_buffer8, len(rx_buffer8) ) return bytes(rx_buffer8) elif bits_per_word <= 16: rx_buffer16 = (ctypes.c_ushort * words_to_receive)() api.dwf_digital_spi_read16( self._device.handle, dq_mode, bits_per_word, rx_buffer16, len(rx_buffer16) ) return array.array('H', rx_buffer16) elif bits_per_word <= 32: rx_buffer32 = (ctypes.c_uint * words_to_receive)() api.dwf_digital_spi_read32( self._device.handle, dq_mode, bits_per_word, rx_buffer32, len(rx_buffer32) ) return array.array('I', rx_buffer32) else: raise ValueError('bits_per_word cannot be higher than 32.')
[docs] def write( self, buffer: bytes, dq_mode: Optional[int] = None, bits_per_word: int = 8 ) -> None: """Performs a SPI write.""" if dq_mode is None: dq_mode = self._dq_mode if bits_per_word <= 8: tx_buffer8 = (ctypes.c_ubyte * len(buffer)).from_buffer_copy(buffer) api.dwf_digital_spi_write( self._device.handle, dq_mode, bits_per_word, tx_buffer8, len(tx_buffer8) ) elif bits_per_word <= 16: tx_buffer16 = (ctypes.c_ushort * len(buffer)).from_buffer_copy(buffer) api.dwf_digital_spi_write16( self._device.handle, dq_mode, bits_per_word, tx_buffer16, len(tx_buffer16) ) elif bits_per_word <= 32: tx_buffer32 = (ctypes.c_uint * len(buffer)).from_buffer_copy(buffer) api.dwf_digital_spi_write32( self._device.handle, dq_mode, bits_per_word, tx_buffer32, len(tx_buffer32) ) else: raise ValueError('bits_per_word cannot be higher than 32.')
[docs] def write_read( self, buffer, words_to_receive: int, dq_mode: Optional[int] = None, bits_per_word: int = 8, ) -> Union[bytes, array.array]: """Performs a SPI write/read.""" if dq_mode is None: dq_mode = self._dq_mode if bits_per_word <= 8: tx_buffer8 = (ctypes.c_ubyte * len(buffer)).from_buffer_copy(buffer) rx_buffer8 = (ctypes.c_ubyte * words_to_receive)() api.dwf_digital_spi_write_read( self._device.handle, dq_mode, bits_per_word, tx_buffer8, len(tx_buffer8), rx_buffer8, len(rx_buffer8), ) return bytes(rx_buffer8) elif bits_per_word <= 16: tx_buffer16 = (ctypes.c_ushort * len(buffer)).from_buffer_copy(buffer) rx_buffer16 = (ctypes.c_ushort * words_to_receive)() api.dwf_digital_spi_write_read16( self._device.handle, dq_mode, bits_per_word, tx_buffer16, len(tx_buffer16), rx_buffer16, len(rx_buffer16), ) return array.array('H', rx_buffer16) elif bits_per_word <= 32: tx_buffer32 = (ctypes.c_uint * len(buffer)).from_buffer_copy(buffer) rx_buffer32 = (ctypes.c_uint * words_to_receive)() api.dwf_digital_spi_write_read32( self._device.handle, dq_mode, bits_per_word, tx_buffer32, len(tx_buffer32), rx_buffer32, len(rx_buffer32), ) return array.array('I', rx_buffer32) else: raise ValueError('bits_per_word cannot be higher than 32.')
@staticmethod def _map_select_level(value) -> int: named_values = { 'l': 0, 'low': 0, 'h': 1, 'high': 1, 'z': -1, 'release': -1, } return Helpers.map_named_value(value, named_values)
[docs] class I2C: """I2C protocol.""" def __init__(self, device): self._device = device self._pin_scl = None self._pin_sda = None self._rate = None self._timeout = None self._read_nak = True self._stretch = True @property def pin_scl(self) -> int: """Gets or sets the DIO channel to use for I2C clock.""" return self._pin_scl @pin_scl.setter def pin_scl(self, value: int) -> None: self._pin_scl = value api.dwf_digital_i2c_scl_set(self._device.handle, value) @property def pin_sda(self) -> int: """Gets or sets the DIO channel to use for I2C data.""" return self._pin_sda @pin_sda.setter def pin_sda(self, value: int) -> None: self._pin_sda = value api.dwf_digital_i2c_sda_set(self._device.handle, value) @property def rate(self) -> float: """Gets or sets the data rate.""" return self._rate @rate.setter def rate(self, value: float) -> None: self._rate = value api.dwf_digital_i2c_rate_set(self._device.handle, value) @property def timeout(self) -> float: """Gets or sets the time-out.""" return self._timeout @timeout.setter def timeout(self, value: float) -> None: self._timeout = value api.dwf_digital_i2c_timeout_set(self._device.handle, value) @property def read_nak(self) -> bool: """Gets or sets a value indicating if the last read byte should be acknowledged or not.""" return self._read_nak @read_nak.setter def read_nak(self, value: bool) -> None: self._read_nak = value api.dwf_digital_i2c_read_nak_set(self._device.handle, value) @property def stretch(self) -> bool: """Enables or disables clock stretching.""" return self._stretch @stretch.setter def stretch(self, value: bool) -> None: self._stretch = value api.dwf_digital_i2c_stretch_set(self._device.handle, value)
[docs] def reset(self) -> None: """Resets the I2C configuration to default value.""" api.dwf_digital_i2c_reset(self._device.handle) self._pin_scl = None self._pin_sda = None self._rate = 100000.0 self._timeout = 1.0 self._read_nak = True self._stretch = True
[docs] def clear(self) -> bool: """Verifies and tries to solve eventual bus lockup. Returns true, if the bus is free.""" return bool(api.dwf_digital_i2c_clear(self._device.handle))
[docs] def setup( self, pin_scl: int, pin_sda: int, rate: Optional[float] = None, timeout: Optional[float] = None, read_nak: Optional[bool] = None, stretch: Optional[bool] = None, ) -> None: """Sets up the I2C configuration.""" if pin_scl is not None: self.pin_scl = pin_scl if pin_sda is not None: self.pin_sda = pin_sda if rate is not None: self.rate = rate if timeout is not None: self.timeout = timeout if read_nak is not None: self.read_nak = read_nak if stretch is not None: self.stretch = stretch
[docs] def write_one(self, address: int, data: int) -> None: """Performs an I2C write of a single byte.""" return api.dwf_digital_i2c_write_one(self._device.handle, address, data)
[docs] def read(self, address: int, bytes_to_read: int) -> Tuple[bytes, int]: """Performs an I2C read. Returns (rx_buffer, nak_index).""" rx_buffer8 = (ctypes.c_ubyte * bytes_to_read)() nak_index = api.dwf_digital_i2c_read( self._device.handle, address, rx_buffer8, len(rx_buffer8) ) return bytes(rx_buffer8), nak_index
[docs] def write(self, address: int, buffer: bytes) -> None: """Performs an I2C write.""" tx_buffer8 = (ctypes.c_ubyte * len(buffer)).from_buffer_copy(buffer) return api.dwf_digital_i2c_write( self._device.handle, address, tx_buffer8, len(tx_buffer8) )
[docs] def write_read(self, address: int, buffer: bytes, bytes_to_read: int) -> Tuple[bytes, int]: """Performs an I2C write/read. Returns (rx_buffer, nak_index).""" tx_buffer8 = (ctypes.c_ubyte * len(buffer)).from_buffer_copy(buffer) rx_buffer8 = (ctypes.c_ubyte * bytes_to_read)() nak_index = api.dwf_digital_i2c_write_read( self._device.handle, address, tx_buffer8, len(tx_buffer8), rx_buffer8, len(rx_buffer8), ) return bytes(rx_buffer8), nak_index
[docs] class CAN: """CAN protocol.""" def __init__(self, device): self._device = device self._pin_tx = None self._pin_rx = None self._rate = None self._inverted = None @property def pin_rx(self) -> int: """Gets or sets the DIO channel to use for reception.""" return self._pin_rx @pin_rx.setter def pin_rx(self, value: int) -> None: self._pin_rx = value api.dwf_digital_can_rx_set(self._device.handle, value) @property def pin_tx(self) -> int: """Gets or sets the DIO channel to use for transmission.""" return self._pin_tx @pin_tx.setter def pin_tx(self, value: int) -> None: self._pin_tx = value api.dwf_digital_can_tx_set(self._device.handle, value) @property def rate(self) -> float: """Gets or sets the data rate.""" return self._rate @rate.setter def rate(self, value: float) -> None: self._rate = value api.dwf_digital_can_rate_set(self._device.handle, value) @property def inverted(self) -> bool: """Gets or sets the polarity.""" return self._inverted @inverted.setter def inverted(self, value: bool) -> None: self._inverted = value api.dwf_digital_can_polarity_set(self._device.handle, value)
[docs] def reset(self) -> None: """Resets the CAN configuration to default value.""" api.dwf_digital_can_reset(self._device.handle) self._pin_tx = None self._pin_rx = None self._rate = None self._inverted = None
[docs] def setup( self, pin_rx: Optional[int] = None, pin_tx: Optional[int] = None, rate: Optional[float] = None, inverted: bool = False, ) -> None: """Sets up the CAN configuration.""" if pin_rx is not None: self.pin_rx = pin_rx if pin_tx is not None: self.pin_tx = pin_tx if rate is not None: self.rate = rate if inverted is not None: self.inverted = inverted api.dwf_digital_can_rx(self._device.handle, 0, None) api.dwf_digital_can_tx(self._device.handle, -1, 0, 0, 0, None)
[docs] def read(self) -> Tuple[bytes, int, int, int, int]: """Returns the received CAN frames since the last call.""" rx_buffer8 = (ctypes.c_char * 8)() frame_id, extended, remote, dlc, status = api.dwf_digital_can_rx( self._device.handle, rx_buffer8, len(rx_buffer8) ) return bytes(rx_buffer8)[:dlc], frame_id, extended, remote, status
[docs] def write(self, frame_id: int, extended: int, remote: int, buffer: bytes) -> None: """Performs a CAN transmission.""" tx_buffer8 = (ctypes.c_char * len(buffer)).from_buffer_copy(buffer) api.dwf_digital_can_tx( self._device.handle, frame_id, extended, remote, len(tx_buffer8), tx_buffer8 )
def __init__(self, device): self._uart = self.Uart(device) self._spi = self.Spi(device) self._i2c = self.I2C(device) self._can = self.CAN(device) @property def uart(self) -> Uart: """Gets the UART protocol unit.""" return self._uart @property def spi(self) -> Spi: """Gets the SPI protocol unit.""" return self._spi @property def i2c(self) -> I2C: """Gets the I2C protocol unit.""" return self._i2c @property def can(self) -> CAN: """Gets the CAN protocol unit.""" return self._can