endaq.device.mqtt¶
- class endaq.device.mqtt.MQTTConnector(host=None, port=1883, name=None, username=None, password=None, clientArgs=None, connectArgs=None, autoupdate=True, updateCallback=None, connectCallback=None, disconnectCallback=None, **kwargs)¶
Class that manages the connection to the MQTT Broker and communication with the MQTT Device Manager.
Class that manages the connection to the MQTT Broker and communication with the MQTT Device Manager.
- Parameters
host (
str, default:None) – The hostname/IP of the MQTT broker. Defaults to the local machine. Note thatlocalhostand127.0.0.1are explicitly converted to the local machine’s IP.port (
int, default:1883) – The port to which to connect.username (
Optional[str], default:None) – The username to use to connect to the broker, if required.password (
Optional[str], default:None) – The password to use to connect to the broker, if required.clientArgs (
Dict[str,Any], default:None) – Additional arguments to be used in the instantiation of the paho.mqtt.client.Client.connectArgs (
Dict[str,Any], default:None) – Additional arguments to be used with paho.mqtt.client.Client.connect().autoupdate (
bool, default:True) – If True, known devices will have their status automatically updated when the MQTTDeviceManager publishes updates to its ‘state’ topic.updateCallback (
Callable, default:None) – A function to be called when a ‘state’ update is received from the MQTTDeviceManager. The function should acceptone argument, a dictionary of state data. This can be set later via the MQTTConnector.updateCallback attribute.connectCallback (
Callable, default:None) – A function to be called when the connection to the broker is established. The function should accept the same arguments as the on_connect handler of a paho.mqtt.client.Client. This can be set later via the MQTTConnector.connectCallback attribute.disconnectCallback (
Callable, default:None) – A function to be called when the connection to the broker is lost. The function should accept the same arguments as the on_disconnect handler of a paho.mqtt.client.Client. This can be set later via the MQTTConnector.disconnectCallback attribute.
- addPort(subscriber)¶
Connect (or reconnect) an existing MQTTSerialPort to the client. To create a new virual serial port, use newPort().
- connect(timeout=30)¶
Connect to the MQTT Broker (if not connected), and start the thread (if not running).
- disconnect()¶
Disconnect from the MQTT Broker. This will close all remote devices’ connections as well. It can be reconnected by calling connect().
- classmethod find(*patterns, **kwargs)¶
A convenience method for creating a new MQTTConnector using an MQTT broker discovered via mDNS. It calls endaq.device.mqtt.discovery.findBrokers() and then instantiates an MQTTConnector using the closest matching broker name. All keywords for both are accepted.
- Parameters
patterns – Zero or more MQTT Broker names (multiple positional arguments). Glob-like wildcards may be used (case-sensitive). Defaults are used if no arguments are provided.
- findDevice(sn=None, chipId=None, timeout=10.0, managerTimeout=None, offline=False, callback=None)¶
Find a specific remote recorder by serial number or unique chip ID. One or the other must be provided, but not both. This is equivalent to endaq.device.findDevice() for remote devices.
- Parameters
sn (
Union[str,int,None], default:None) – The serial number of the recorder to find. Cannot be used with chipId. It can be an integer or a formatted serial number string (e.g., 12345 or “S00012345”).chipId (
Union[str,int,None], default:None) – The chip ID of the recorder to find. Cannot be used with sn. It can be an integer or a hex string.timeout (
Union[int,float], default:10.0) – Time (in seconds) to wait for a response from the Device Manager before raising a DeviceTimeout exception. None or -1 will wait indefinitely.managerTimeout (
Optional[int], default:None) – A value (in seconds) that overrides the remote Device Manager’s timeout that excludes inactive devices. 0 will return all devices, regardless of how long it has been since they reported to the Device Manager.offline (
bool, default:False) – If True, include devices that are reported to have disconnected.callback (
Optional[Callable], default:None) – A function to call each response-checking cycle. If the callback returns True, the wait for a response will be cancelled. The callback function requires no arguments.
- getCachedHeader(device)¶
Retrieve a device’s cached IDE header data from the Device Manager (if known).
- Parameters
device (
Union[int,Recorder]) – The device for which to get the header data. Either a Recorder instance or a device serial number.- Returns
The cached IDE header data, as undecoded EBML.
- Return type
bytearray
- getDeviceInfo(timeout=10.0, managerTimeout=None, callback=None)¶
Get a list of DEVINFO data for active devices from the MQTT Device Manager.
- Parameters
timeout (
Union[int,float], default:10.0) – Time (in seconds) to wait for a response from the Device Manager before raising a DeviceTimeout exception. None or -1 will wait indefinitely.managerTimeout (
Optional[int], default:None) – A value (in seconds) that overrides the remote Device Manager’s timeout that excludes inactive devices. 0 will return all devices, regardless of how long it has been since they reported to the Device Manager.callback (
Optional[Callable], default:None) – A function to call each response-checking cycle. If the callback returns True, the wait for a response will be cancelled. The callback function requires no arguments.
- getDevices(update=False, timeout=10.0, managerTimeout=None, offline=False, callback=None)¶
Get a list of remote data recorder objects from the MQTT broker. This method also updates the status of existing MQTT Recorder instances.
- Parameters
update (
bool, default:False) – If True, update previously discovered devices connected via USB (serial or storage device) to an MQTT interface and include them in the results.timeout (
Union[int,float], default:10.0) – Time (in seconds) to wait for a response from the Device Manager before raising a DeviceTimeout exception. None or -1 will wait indefinitely.managerTimeout (
Optional[int], default:None) – A value (in seconds) that overrides the remote Device Manager’s timeout that excludes inactive devices. 0 will return all devices, regardless of how long it has been since they reported to the Device Manager.offline (
bool, default:False) – If True, include devices that are reported to have disconnected.callback (
Optional[Callable], default:None) – A function to call each response-checking cycle. If the callback returns True, the wait for a response will be cancelled. The callback function requires no arguments.
- newPort(read=None, write=None, timeout=None, write_timeout=None, maxsize=16384, qos=1)¶
Create a new virtual port for Serial-over-MQTT. Using this method is recommended over directly instantiating an MQTTSerialPort.
- Parameters
read (
Optional[str], default:None) – The MQTT topic serving as RX. Can be None if the port is only written to.write (
Optional[str], default:None) – The MQTT topic serving as TX. Can be None if the port is only read from.timeout (
Optional[float], default:None) – Timeout (seconds) for port reads.write_timeout (
Optional[float], default:None) – Timeout (seconds) for port writes.maxsize (
int, default:16384) – The maximum size of the read buffer.qos (
int, default:1) – MQTT quality of service for writes.
- removePort(subscriber)¶
Disconnect an MQTTSerialPort from the client.
- resubscribe()¶
Resubscribe to all topics currently subscribed to. For use after changing a broker connection (e.g., its IP changed after rebooting).
- subscribe(topic, *args, **kwargs)¶
Wrapper for subscribing to MQTT topics, which are stored for resubscribing if the broker connection changes (e.g., its IP changed after rebooting).
- unsubscribe(topic, properties=None)¶
Wrapper for unsubscribing to MQTT topics, which also removes them from the set of cached topics.
MQTT Device Manager¶
A client that monitors several MQTT topics, keeping track of sensors and other devices, and providing additional features for device discovery and data streaming.
Starting an MQTTDeviceManager is typically done via the
start() function.
- class endaq.device.mqtt.manager.MQTTDeviceManager(client, make_crc=True, ignore_crc=False, interval=45, cache='/home/docs/.endaq/mqtt_manager', shutdown=False)¶
A client that monitors several MQTT topics, keeping track of sensors and other devices, and providing additional features for device discovery and data streaming.
Starting an MQTTDeviceManager is typically done via the start() function.
A client that monitors several MQTT topics, providing additional features for device discovery and data streaming.
- Parameters
client (
Client) – The manager’s MQTT client.make_crc (
bool, default:True) – If True, generate CRCs for outgoing commands and responses.ignore_crc (
bool, default:False) – If False, do not validate incoming commands or responses.interval (
int, default:45) – The time between published state updates. If 0, no state updates will be published.cache (
Union[str,Path,BaseCache], default:'/home/docs/.endaq/mqtt_manager') – The location for cached device data, either a directory or an instance of a BaseCache storage handler.shutdown (
bool, default:False) – If True, the manager can be shut down remotely via theShutdownEBML command.
- cleanCache(retention=24)¶
Clean out cached header data.
- Parameters
retention (
int, default:24) – The cached file retention period. Files not modified in retention hours will be removed.
- command_GetDeviceList(payload, lockId=None)¶
Handle a
GetDeviceListcommand (EBML ID 0x5C00).
- command_GetIDEHeader(payload, lockId=None)¶
Handle a
GetIDEHeadercommand (EBML ID 0x5C20).
- command_Shutdown(payload, lockId=None)¶
Handle a
Shutdowncommand (EBML ID 0x5CFF). May be refused. This may not return anything to the sender, as the shutdown is immediate.
- getDevice(sn)¶
Get or create an MQTTDevice instance.
- Parameters
sn (
int) – The sending device’s serial number.
- getSenderSerial(topic)¶
Extract the sending device’s serial number from the name of an MQTT topic.
- Parameters
topic (
str) – The message topic.- Returns
The serial number. Typically an integer, but special cases (like the manager) may have a string value.
- Return type
Union[int, str]
- onConnect(*args)¶
MQTT event handler called when the client connects.
- onStateMessage(_client, _userdata, message)¶
Handle a message received in the announcement topic.
- stop()¶
Shut down the Device Manager (and Advertiser, if running).
- updateState()¶
Publish an updated set of data to the ‘state’ topic.
- endaq.device.mqtt.manager.start(host=None, port=1883, advertise=True, name='Data Collection Box Interface._endaq._tcp.local.', rename=False, background=True, clientArgs=None, connectArgs=None, advertArgs=None, managerArgs=None, clean=None)¶
Start the Device Manager and (optionally) the mDNS advertiser. This is a temporary implementation and will be refactored.
- Parameters
host (
Optional[str], default:None) – The hostname/IP of the MQTT broker. Defaults to the current machine’s.port (
int, default:1883) – The port to which to connect.advertise (
bool, default:True) – If True, start the mDNS advertising of the broker.name (
Optional[str], default:'Data Collection Box Interface._endaq._tcp.local.') – The name under which the MQTT broker will be advertised.rename (
bool, default:False) – If True and the broker name is already being advertised, add an incrementing number until the name is unique.background (
bool, default:True) – If True, this function returns an MQTTDeviceManager instance with the client loop running in a thread. If False, the function will run the client loop in the foreground and will not return.clientArgs (
Dict[str,Any], default:None) – A dictionary of additional keyword arguments to be used in the instantiation of the paho.mqtt.client.Client.connectArgs (
Dict[str,Any], default:None) – A dictionary of additional keyword arguments to be used with paho.mqtt.client.Client.connect().advertArgs (
Dict[str,Any], default:None) – A dictionary of additional keyword arguments to be used in the instantiation of the Advertiser (if advertise is True).managerArgs (
Dict[str,Any], default:None) – A dictionary of additional keyword arguments to be used in the instantiation of the MQTTDeviceManager.clean (
Optional[int], default:None) – If not None, remove cached header data older than clean hours on Device Manager startup.
- Returns
The running MQTTDeviceManager if background, else the function runs indefinitely without returning.
- endaq.device.mqtt.manager.stop()¶
Shut down all running MQTTDeviceManager instances. A convenience function intended for use if a reference to the MQTTDeviceManager wasn’t kept (e.g., started with
start(), notm = start()) or was otherwise lost.