pip_services3_mqtt.queues.MqttMessageQueue module¶
-
class
pip_services3_mqtt.queues.MqttMessageQueue.
MqttMessageQueue
(name: str = None)¶ Bases:
pip_services3_messaging.queues.MessageQueue.MessageQueue
,pip_services3_commons.refer.IUnreferenceable.IUnreferenceable
,pip_services3_commons.run.IOpenable.IOpenable
,pip_services3_commons.run.ICleanable.ICleanable
Message queue that sends and receives messages via MQTT message broker.
MQTT is a popular light-weight protocol to communicate IoT devices.
### Configuration parameters ###
topic: name of MQTT topic to subscribe
- connection(s):
discovery_key: (optional) a key to retrieve the connection from
IDiscovery
host: host name or IP address
port: port number
uri: resource URI or connection string with all parameters in it
- credential(s):
store_key: (optional) a key to retrieve the credentials from
ICredentialStore
username: user name
password: user password
- options:
serialize_envelope: (optional) true to serialize entire message as JSON, false to send only message payload (default: true)
autosubscribe: (optional) true to automatically subscribe on option (default: false)
qos: (optional) quality of service level aka QOS (default: 0)
retain: (optional) retention flag for published messages (default: false)
retry_connect: (optional) turns on/off automated reconnect when connection is log (default: true)
connect_timeout: (optional) number of milliseconds to wait for connection (default: 30000)
reconnect_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 1000)
keepalive_timeout: (optional) number of milliseconds to ping broker while inactive (default: 3000)
### References ###
:logger::*:1.0 (optional)
ILogger
components to pass log messages:counters::*:1.0 (optional)
ICounters
components to pass collected measurements:discovery::*:1.0 (optional)
IDiscovery
services to resolve connections:credential-store::*:1.0 (optional) Credential stores to resolve credentials
:connection:mqtt::1.0 (optional) Shared connection to MQTT service
See:
MessageQueue
,MessagingCapabilities
Example:
queue = MqttMessageQueue("myqueue") queue.configure(ConfigParams.from_tuples( "topic", "mytopic", 'connection.protocol', 'mqtt', "connection.host", "localhost", "connection.port", 1883 )) queue.open("123") queue.send("123", MessageEnvelope(None, "mymessage", "ABC")) message = queue.receive("123", 10000)
-
abandon
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.
Important: This method is not supported by MQTT.
- Parameters
message – a message to return.
-
clear
(correlation_id: Optional[str])¶ Clears component state.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
close
(correlation_id: Optional[str])¶ Closes component and frees used resources.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
complete
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.
Important: This method is not supported by MQTT.
- Parameters
message – a message to remove.
-
configure
(config: pip_services3_commons.config.ConfigParams.ConfigParams)¶ Configures object by passing configuration parameters.
- Parameters
config – configuration parameters to be set.
-
end_listen
(correlation_id: Optional[str])¶ Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
is_open
() → bool¶ Checks if the component is opened.
- Returns
true if the component has been opened and false otherwise.
-
listen
(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)¶ Listens for incoming messages and blocks the current thread until queue is closed.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
receiver – a receiver to receive incoming messages.
-
move_to_dead_letter
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue and sends it to dead letter queue.
Important: This method is not supported by MQTT.
- Parameters
message – a message to be removed.
-
on_message
(topic: str, data: Any, packet: Any)¶
-
open
(correlation_id: Optional[str])¶ Opens the component.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
peek
(correlation_id: Optional[str]) → pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope¶ Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns None.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- Returns
a peeked message.
-
peek_batch
(correlation_id: Optional[str], message_count: int) → List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]¶ Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.
Important: This method is not supported by MQTT.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message_count – a maximum number of messages to peek.
- Returns
a list with peeked messages.
-
read_message_count
() → int¶ Reads the current number of messages in the queue to be delivered.
- Returns
number of available messages.
-
receive
(correlation_id: Optional[str], wait_timeout: int) → pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope¶ Receives an incoming message and removes it from the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
wait_timeout – a timeout in milliseconds to wait for a message to come.
- Returns
a received message.
-
renew_lock
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)¶ Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
Important: This method is not supported by MQTT.
- Parameters
message – a message to extend its lock.
lock_timeout – a locking timeout in milliseconds.
-
send
(correlation_id: Optional[str], message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Sends a message into the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message – a message envelop to be sent.
-
set_references
(references: pip_services3_commons.refer.IReferences.IReferences)¶ Sets references to dependent components.
- Parameters
references – references to locate the component dependencies.
-
unset_references
()¶ Unsets (clears) previously set references to dependent components.