endaq.device.mqtt

class endaq.device.mqtt.MQTTConnector(host=None, port=1883, name=None, username=None, password=None, clientArgs=None, connectArgs=None, autoupdate=True, updateCallback=None, connectCallback=None, disconnectCallback=None, **kwargs)

Class that manages the connection to the MQTT Broker and communication with the MQTT Device Manager.

Class that manages the connection to the MQTT Broker and communication with the MQTT Device Manager.

Parameters
  • host (str, default: None) – The hostname/IP of the MQTT broker. Defaults to the local machine. Note that localhost and 127.0.0.1 are explicitly converted to the local machine’s IP.

  • port (int, default: 1883) – The port to which to connect.

  • username (Optional[str], default: None) – The username to use to connect to the broker, if required.

  • password (Optional[str], default: None) – The password to use to connect to the broker, if required.

  • clientArgs (Dict[str, Any], default: None) – Additional arguments to be used in the instantiation of the paho.mqtt.client.Client.

  • connectArgs (Dict[str, Any], default: None) – Additional arguments to be used with paho.mqtt.client.Client.connect().

  • autoupdate (bool, default: True) – If True, known devices will have their status automatically updated when the MQTTDeviceManager publishes updates to its ‘state’ topic.

  • updateCallback (Callable, default: None) – A function to be called when a ‘state’ update is received from the MQTTDeviceManager. The function should acceptone argument, a dictionary of state data. This can be set later via the MQTTConnector.updateCallback attribute.

  • connectCallback (Callable, default: None) – A function to be called when the connection to the broker is established. The function should accept the same arguments as the on_connect handler of a paho.mqtt.client.Client. This can be set later via the MQTTConnector.connectCallback attribute.

  • disconnectCallback (Callable, default: None) – A function to be called when the connection to the broker is lost. The function should accept the same arguments as the on_disconnect handler of a paho.mqtt.client.Client. This can be set later via the MQTTConnector.disconnectCallback attribute.

addPort(subscriber)

Connect (or reconnect) an existing MQTTSerialPort to the client. To create a new virual serial port, use newPort().

connect(timeout=30)

Connect to the MQTT Broker (if not connected), and start the thread (if not running).

disconnect()

Disconnect from the MQTT Broker. This will close all remote devices’ connections as well. It can be reconnected by calling connect().

classmethod find(*patterns, **kwargs)

A convenience method for creating a new MQTTConnector using an MQTT broker discovered via mDNS. It calls endaq.device.mqtt.discovery.findBrokers() and then instantiates an MQTTConnector using the closest matching broker name. All keywords for both are accepted.

Parameters

patterns – Zero or more MQTT Broker names (multiple positional arguments). Glob-like wildcards may be used (case-sensitive). Defaults are used if no arguments are provided.

findDevice(sn=None, chipId=None, timeout=10.0, managerTimeout=None, offline=False, callback=None)

Find a specific remote recorder by serial number or unique chip ID. One or the other must be provided, but not both. This is equivalent to endaq.device.findDevice() for remote devices.

Parameters
  • sn (Union[str, int, None], default: None) – The serial number of the recorder to find. Cannot be used with chipId. It can be an integer or a formatted serial number string (e.g., 12345 or “S00012345”).

  • chipId (Union[str, int, None], default: None) – The chip ID of the recorder to find. Cannot be used with sn. It can be an integer or a hex string.

  • timeout (Union[int, float], default: 10.0) – Time (in seconds) to wait for a response from the Device Manager before raising a DeviceTimeout exception. None or -1 will wait indefinitely.

  • managerTimeout (Optional[int], default: None) – A value (in seconds) that overrides the remote Device Manager’s timeout that excludes inactive devices. 0 will return all devices, regardless of how long it has been since they reported to the Device Manager.

  • offline (bool, default: False) – If True, include devices that are reported to have disconnected.

  • callback (Optional[Callable], default: None) – A function to call each response-checking cycle. If the callback returns True, the wait for a response will be cancelled. The callback function requires no arguments.

getCachedHeader(device)

Retrieve a device’s cached IDE header data from the Device Manager (if known).

Parameters

device (Union[int, Recorder]) – The device for which to get the header data. Either a Recorder instance or a device serial number.

Returns

The cached IDE header data, as undecoded EBML.

Return type

bytearray

getDeviceInfo(timeout=10.0, managerTimeout=None, callback=None)

Get a list of DEVINFO data for active devices from the MQTT Device Manager.

Parameters
  • timeout (Union[int, float], default: 10.0) – Time (in seconds) to wait for a response from the Device Manager before raising a DeviceTimeout exception. None or -1 will wait indefinitely.

  • managerTimeout (Optional[int], default: None) – A value (in seconds) that overrides the remote Device Manager’s timeout that excludes inactive devices. 0 will return all devices, regardless of how long it has been since they reported to the Device Manager.

  • callback (Optional[Callable], default: None) – A function to call each response-checking cycle. If the callback returns True, the wait for a response will be cancelled. The callback function requires no arguments.

getDevices(update=False, timeout=10.0, managerTimeout=None, offline=False, callback=None)

Get a list of remote data recorder objects from the MQTT broker. This method also updates the status of existing MQTT Recorder instances.

Parameters
  • update (bool, default: False) – If True, update previously discovered devices connected via USB (serial or storage device) to an MQTT interface and include them in the results.

  • timeout (Union[int, float], default: 10.0) – Time (in seconds) to wait for a response from the Device Manager before raising a DeviceTimeout exception. None or -1 will wait indefinitely.

  • managerTimeout (Optional[int], default: None) – A value (in seconds) that overrides the remote Device Manager’s timeout that excludes inactive devices. 0 will return all devices, regardless of how long it has been since they reported to the Device Manager.

  • offline (bool, default: False) – If True, include devices that are reported to have disconnected.

  • callback (Optional[Callable], default: None) – A function to call each response-checking cycle. If the callback returns True, the wait for a response will be cancelled. The callback function requires no arguments.

newPort(read=None, write=None, timeout=None, write_timeout=None, maxsize=16384, qos=1)

Create a new virtual port for Serial-over-MQTT. Using this method is recommended over directly instantiating an MQTTSerialPort.

Parameters
  • read (Optional[str], default: None) – The MQTT topic serving as RX. Can be None if the port is only written to.

  • write (Optional[str], default: None) – The MQTT topic serving as TX. Can be None if the port is only read from.

  • timeout (Optional[float], default: None) – Timeout (seconds) for port reads.

  • write_timeout (Optional[float], default: None) – Timeout (seconds) for port writes.

  • maxsize (int, default: 16384) – The maximum size of the read buffer.

  • qos (int, default: 1) – MQTT quality of service for writes.

removePort(subscriber)

Disconnect an MQTTSerialPort from the client.

resubscribe()

Resubscribe to all topics currently subscribed to. For use after changing a broker connection (e.g., its IP changed after rebooting).

subscribe(topic, *args, **kwargs)

Wrapper for subscribing to MQTT topics, which are stored for resubscribing if the broker connection changes (e.g., its IP changed after rebooting).

unsubscribe(topic, properties=None)

Wrapper for unsubscribing to MQTT topics, which also removes them from the set of cached topics.

MQTT Device Manager

A client that monitors several MQTT topics, keeping track of sensors and other devices, and providing additional features for device discovery and data streaming.

Starting an MQTTDeviceManager is typically done via the start() function.

class endaq.device.mqtt.manager.MQTTDeviceManager(client, make_crc=True, ignore_crc=False, interval=45, cache='/home/docs/.endaq/mqtt_manager', shutdown=False)

A client that monitors several MQTT topics, keeping track of sensors and other devices, and providing additional features for device discovery and data streaming.

Starting an MQTTDeviceManager is typically done via the start() function.

A client that monitors several MQTT topics, providing additional features for device discovery and data streaming.

Parameters
  • client (Client) – The manager’s MQTT client.

  • make_crc (bool, default: True) – If True, generate CRCs for outgoing commands and responses.

  • ignore_crc (bool, default: False) – If False, do not validate incoming commands or responses.

  • interval (int, default: 45) – The time between published state updates. If 0, no state updates will be published.

  • cache (Union[str, Path, BaseCache], default: '/home/docs/.endaq/mqtt_manager') – The location for cached device data, either a directory or an instance of a BaseCache storage handler.

  • shutdown (bool, default: False) – If True, the manager can be shut down remotely via the Shutdown EBML command.

cleanCache(retention=24)

Clean out cached header data.

Parameters

retention (int, default: 24) – The cached file retention period. Files not modified in retention hours will be removed.

command_GetDeviceList(payload, lockId=None)

Handle a GetDeviceList command (EBML ID 0x5C00).

command_GetIDEHeader(payload, lockId=None)

Handle a GetIDEHeader command (EBML ID 0x5C20).

command_Shutdown(payload, lockId=None)

Handle a Shutdown command (EBML ID 0x5CFF). May be refused. This may not return anything to the sender, as the shutdown is immediate.

getDevice(sn)

Get or create an MQTTDevice instance.

Parameters

sn (int) – The sending device’s serial number.

getSenderSerial(topic)

Extract the sending device’s serial number from the name of an MQTT topic.

Parameters

topic (str) – The message topic.

Returns

The serial number. Typically an integer, but special cases (like the manager) may have a string value.

Return type

Union[int, str]

onConnect(*args)

MQTT event handler called when the client connects.

onStateMessage(_client, _userdata, message)

Handle a message received in the announcement topic.

stop()

Shut down the Device Manager (and Advertiser, if running).

updateState()

Publish an updated set of data to the ‘state’ topic.

endaq.device.mqtt.manager.start(host=None, port=1883, advertise=True, name='Data Collection Box Interface._endaq._tcp.local.', rename=False, background=True, clientArgs=None, connectArgs=None, advertArgs=None, managerArgs=None, clean=None)

Start the Device Manager and (optionally) the mDNS advertiser. This is a temporary implementation and will be refactored.

Parameters
  • host (Optional[str], default: None) – The hostname/IP of the MQTT broker. Defaults to the current machine’s.

  • port (int, default: 1883) – The port to which to connect.

  • advertise (bool, default: True) – If True, start the mDNS advertising of the broker.

  • name (Optional[str], default: 'Data Collection Box Interface._endaq._tcp.local.') – The name under which the MQTT broker will be advertised.

  • rename (bool, default: False) – If True and the broker name is already being advertised, add an incrementing number until the name is unique.

  • background (bool, default: True) – If True, this function returns an MQTTDeviceManager instance with the client loop running in a thread. If False, the function will run the client loop in the foreground and will not return.

  • clientArgs (Dict[str, Any], default: None) – A dictionary of additional keyword arguments to be used in the instantiation of the paho.mqtt.client.Client.

  • connectArgs (Dict[str, Any], default: None) – A dictionary of additional keyword arguments to be used with paho.mqtt.client.Client.connect().

  • advertArgs (Dict[str, Any], default: None) – A dictionary of additional keyword arguments to be used in the instantiation of the Advertiser (if advertise is True).

  • managerArgs (Dict[str, Any], default: None) – A dictionary of additional keyword arguments to be used in the instantiation of the MQTTDeviceManager.

  • clean (Optional[int], default: None) – If not None, remove cached header data older than clean hours on Device Manager startup.

Returns

The running MQTTDeviceManager if background, else the function runs indefinitely without returning.

endaq.device.mqtt.manager.stop()

Shut down all running MQTTDeviceManager instances. A convenience function intended for use if a reference to the MQTTDeviceManager wasn’t kept (e.g., started with start(), not m = start()) or was otherwise lost.