pip_services3_messaging.queues package¶
Submodules¶
- pip_services3_messaging.queues.CachedMessageQueue module
- pip_services3_messaging.queues.CallbackMessageReceiver module
- pip_services3_messaging.queues.IMessageQueue module
- pip_services3_messaging.queues.IMessageReceiver module
- pip_services3_messaging.queues.LockedMessage module
- pip_services3_messaging.queues.MemoryMessageQueue module
- pip_services3_messaging.queues.MessageEnvelope module
- pip_services3_messaging.queues.MessageQueue module
- pip_services3_messaging.queues.MessagingCapabilities module
Module contents¶
pip_services3_messaging.queues.__init__¶
Queues module initialization
- copyright
Conceptual Vision Consulting LLC 2018-2019, see AUTHORS for more details.
- license
MIT, see LICENSE for more details.
-
class
pip_services3_messaging.queues.
IMessageQueue
¶ Bases:
pip_services3_commons.run.IOpenable.IOpenable
,pip_services3_commons.run.IClosable.IClosable
Interface for asynchronous message queues.
Not all queues may implement all the methods. Attempt to call non-supported method will result in NotImplemented exception. To verify if specific method is supported consult with
MessagingCapabilities
.See
MessagingCapabilities
,MessageEnvelope
-
abandon
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.
- Parameters
message – a message to return.
-
begin_listen
(correlation_id: Optional[str], receiver: <module 'pip_services3_messaging.queues.IMessageReceiver' from '/pip_services3_messaging/queues/IMessageReceiver.py'>)¶ Listens for incoming messages without blocking the current thread.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
receiver – a receiver to receive incoming messages.
-
complete
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.
- Parameters
message – a message to remove.
-
end_listen
(correlation_id: Optional[str])¶ Ends listening for incoming messages. When this method is call
listen()
unblocks the thread and execution continues.- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
get_capabilities
() → <module ‘pip_services3_messaging.queues.MessagingCapabilities’ from ‘/pip_services3_messaging/queues/MessagingCapabilities.py’>¶ Gets the queue capabilities
- Returns
the queue’s capabilities object.
-
listen
(correlation_id: Optional[str], receiver: <module 'pip_services3_messaging.queues.IMessageReceiver' from '/pip_services3_messaging/queues/IMessageReceiver.py'>)¶ Listens for incoming messages and blocks the current thread until queue is closed.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
receiver – a receiver to receive incoming messages.
-
move_to_dead_letter
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue and sends it to dead letter queue.
- Parameters
message – a message to be removed.
-
peek
(correlation_id: Optional[str]) → pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope¶ Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- Returns
a message object.
-
peek_batch
(correlation_id: Optional[str], message_count: int) → List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]¶ Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message_count – a maximum number of messages to peek.
- Returns
a list of message objects.
-
read_message_count
() → int¶ Reads the current number of messages in the queue to be delivered.
- Returns
a number of messages
-
receive
(correlation_id: Optional[str], wait_timeout: int) → pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope¶ Receives an incoming message and removes it from the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
wait_timeout – a timeout in milliseconds to wait for a message to come.
- Returns
a message object.
-
renew_lock
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)¶ Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
- Parameters
message – a message to extend its lock.
lock_timeout – a locking timeout in milliseconds.
-
send
(correlation_id: Optional[str], envelop: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Sends a message into the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
envelop – a message envelop to be sent.
-
send_as_object
(correlation_id: Optional[str], message_type: str, message: Any)¶ Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a
MessageEnvelope
.- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message_type – a message type
message – an object value to be sent
-
-
class
pip_services3_messaging.queues.
IMessageReceiver
¶ Bases:
abc.ABC
Callback interface to receive incoming messages.
Example:
class MyMessageReceiver(IMessageReceiver): def receive_message(self, envelop, queue): print "Received message: " + envelop.get_message_as_string() messageQueue = MemoryMessageQueue() messageQueue.listen("123", MyMessageReceiver()) messageQueue.open("123") messageQueue.send("123", MessageEnvelope(None, "mymessage", "ABC")) # Output in console: "ABC"
-
receive_message
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, queue: <module 'pip_services3_messaging.queues.IMessageQueue' from '/pip_services3_messaging/queues/IMessageQueue.py'>)¶ Receives incoming message from the queue. :param message: an incoming message :param queue: a queue where the message comes from
-
-
class
pip_services3_messaging.queues.
MemoryMessageQueue
(name: str = None)¶ Bases:
pip_services3_messaging.queues.MessageQueue.MessageQueue
,pip_services3_commons.run.ICleanable.ICleanable
Message queue that sends and receives messages within the same process by using shared memory. This queue is typically used for testing to mock real queues.
- ### Configuration parameters ###
name: name of the message queue
- ### References ###
Example:
queue = MessageQueue("myqueue") queue.send("123", MessageEnvelope(None, "mymessage", "ABC")) message = queue.receive("123", 0) if message != None: # ... queue.complete("123", message)
-
abandon
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.
- Parameters
message – a message to return.
-
clear
(correlation_id: Optional[str])¶ Clears component state.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
close
(correlation_id: Optional[str])¶ Closes component and frees used resources.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
complete
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.
- Parameters
message – a message to remove.
-
configure
(config: pip_services3_commons.config.ConfigParams.ConfigParams)¶ Configures component by passing configuration parameters.
- Parameters
config – configuration parameters to be set.
-
end_listen
(correlation_id)¶ Ends listening for incoming messages. When this method is call
listen()
unblocks the thread and execution continues.- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
is_open
()¶ Checks if the component is opened.
- Returns
true if the component has been opened and false otherwise.
-
listen
(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)¶ Listens for incoming messages and blocks the current thread until queue is closed.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
receiver – a receiver to receive incoming messages.
-
move_to_dead_letter
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue and sends it to dead letter queue.
- Parameters
message – a message to be removed.
-
peek
(correlation_id: Optional[str]) → pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope¶ Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- Returns
a message object.
-
peek_batch
(correlation_id: Optional[str], message_count: int) → List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]¶ Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message_count – a maximum number of messages to peek.
- Returns
a list of message objects.
-
read_message_count
() → int¶ Reads the current number of messages in the queue to be delivered.
- Returns
a number of messages
-
receive
(correlation_id: Optional[str], wait_timeout: int) → Optional[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]¶ Receives an incoming message and removes it from the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
wait_timeout – a timeout in milliseconds to wait for a message to come.
- Returns
a message object.
-
renew_lock
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)¶ Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
- Parameters
message – a message to extend its lock.
lock_timeout – a locking timeout in milliseconds.
-
send
(correlation_id, message)¶ Sends a message into the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message – a message envelop to be sent.
-
class
pip_services3_messaging.queues.
MessageEnvelope
(correlation_id: Optional[str], message_type: Optional[str], message: Optional[Any])¶ Bases:
object
Allows adding additional information to messages. A correlation id, message id, and a message type are added to the data being sent/received. Additionally, a MessageEnvelope can reference a lock token.
Side note: a MessageEnvelope’s message is stored as a buffer, so strings are converted using utf8 conversions.
-
static
from_json
(value: str) → Optional[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]¶ Converts a JSON string into a MessageEnvelope The message payload is passed as base64 string
- Parameters
value – a JSON encoded string
- Returns
a decoded Message Envelop.
-
get_message_as
() → Any¶ TODO add description
- Returns
the value that was stored in this message as a JSON string.
-
get_message_as_string
() → Optional[str]¶ TODO add description
- Returns
the information stored in this message as a UTF-8 encoded string.
-
get_reference
() → Any¶ Gets a lock token reference for this MessageEnvelope.
- Returns
the lock token that this MessageEnvelope references.
-
set_message_as_object
(value: Any)¶ Stores the given value as a object.
- Parameters
value – the value to convert to JSON and store in this message.
-
set_message_as_string
(value: str)¶ Stores the given string.
- Parameters
value – the string to set. Will be converted to a buffer, using UTF-8 encoding.
-
set_reference
(value: Any)¶ Sets a lock token reference for this MessageEnvelope.
- Parameters
value – the lock token to reference.
-
static
-
class
pip_services3_messaging.queues.
MessageQueue
(name: str = None, capabilities: pip_services3_messaging.queues.MessagingCapabilities.MessagingCapabilities = None)¶ Bases:
pip_services3_commons.config.IConfigurable.IConfigurable
,pip_services3_commons.refer.IReferenceable.IReferenceable
,pip_services3_messaging.queues.IMessageQueue.IMessageQueue
Abstract message queue.
Abstract message queue that is used as a basis for specific message queue implementations.
- ### Configuration parameters ###
name: name of the message queue
- connection(s):
discovery_key: key to retrieve parameters from discovery service
protocol: connection protocol like http, https, tcp, udp
host: host name or IP address
port: port number
uri: resource URI or connection string with all parameters in it
credential(s):
store_key: key to retrieve parameters from credential store
username: user name
password: user password
access_id: application access id
access_key: application secret key
- ### References ###
*:logger:*:*:1.0 (optional)
ILogger
components to pass log messages*:counters:*:*:1.0 (optional)
ICounters
components to pass collected measurements*:discovery:*:*:1.0 (optional)
IDiscovery
components to discover connection(s)*:credential-store:*:*:1.0 (optional)
ICredentialStore
componetns to lookup credential(s)
-
abstract
abandon
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.
- Parameters
message – a message to return.
-
begin_listen
(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)¶ Listens for incoming messages without blocking the current thread.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
receiver – a receiver to receive incoming messages.
-
abstract
clear
(correlation_id: Optional[str])¶ Clears component state.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
abstract
close
(correlation_id: Optional[str])¶ Closes component and frees used resources.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
abstract
complete
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue. This method is usually used to remove the message after successful processing. :param message: a message to remove.
-
configure
(config: pip_services3_commons.config.ConfigParams.ConfigParams)¶ Configures component by passing configuration parameters.
- Parameters
config – configuration parameters to be set.
-
abstract
end_listen
(correlation_id: Optional[str])¶ Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
get_capabilities
() → pip_services3_messaging.queues.MessagingCapabilities.MessagingCapabilities¶ Gets the queue capabilities
- Returns
the queue’s capabilities object.
-
abstract
is_open
() → bool¶ Checks if the component is opened.
- Returns
true if the component has been opened and false otherwise.
-
abstract
listen
(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)¶ Listens for incoming messages and blocks the current thread until queue is closed.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
receiver – a receiver to receive incoming messages.
-
abstract
move_to_dead_letter
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Permanently removes a message from the queue and sends it to dead letter queue.
- Parameters
message – a message to be removed.
-
open
(correlation_id: Optional[str])¶ Opens the component.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
-
abstract
peek
(correlation_id: Optional[str]) → pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope¶ Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns None.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- Returns
a peeked message or None.
-
abstract
peek_batch
(correlation_id: Optional[str], message_count: int) → List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]¶ Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message_count – a maximum number of messages to peek.
- Returns
a list of peeked messages
-
abstract
read_message_count
() → int¶ Reads the current number of messages in the queue to be delivered.
- Returns
a number of messages in the queue.
-
abstract
receive
(correlation_id: Optional[str], wait_timeout: int) → pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope¶ Receives an incoming message and removes it from the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
wait_timeout – a timeout in milliseconds to wait for a message to come.
- Returns
a received message or None.
-
abstract
renew_lock
(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)¶ Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
- Parameters
message – a message to extend its lock.
lock_timeout – a locking timeout in milliseconds.
-
abstract
send
(correlation_id: Optional[str], envelop: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)¶ Sends a message into the queue. :param correlation_id: (optional) transaction id to trace execution through call chain. :param envelop: a message envelop to be sent.
-
send_as_object
(correlation_id: Optional[str], message_type: str, message: Any)¶ Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a
MessageEnvelope
.- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message_type – a message type
message – an object value to be sent
-
set_references
(references: pip_services3_commons.refer.IReferences.IReferences)¶ Sets references to dependent components.
- Parameters
references – references to locate the component dependencies.
-
class
pip_services3_messaging.queues.
MessagingCapabilities
(message_count: bool, send: bool, receive: bool, peek: bool, peek_batch: bool, renew_lock: bool, abandon: bool, dead_letter: bool, clear: bool)¶ Bases:
object
Data object that contains supported capabilities of a message queue. If certain capability is not supported a queue will throw NotImplemented exception.
-
can_abandon
() → bool¶ Informs if the queue is able to abandon messages.
- Returns
true if queue is able to abandon.
-
can_dead_letter
() → bool¶ Informs if the queue is able to send messages to dead letter queue.
- Returns
true if queue is able to send messages to dead letter queue.
-
can_message_count
() → bool¶ Informs if the queue is able to read number of messages.
- Returns
true if queue supports reading message count.
-
can_peek
() → bool¶ Informs if the queue is able to peek messages.
- Returns
true if queue is able to peek messages.
-
can_peek_batch
() → bool¶ Informs if the queue is able to peek multiple messages in one batch.
- Returns
true if queue is able to peek multiple messages in one batch.
-
can_receive
() → bool¶ Informs if the queue is able to receive messages.
- Returns
true if queue is able to receive messages.
-