pip_services3_messaging.queues package

Module contents

pip_services3_messaging.queues.__init__

Queues module initialization

copyright

Conceptual Vision Consulting LLC 2018-2019, see AUTHORS for more details.

license

MIT, see LICENSE for more details.

class pip_services3_messaging.queues.IMessageQueue

Bases: pip_services3_commons.run.IOpenable.IOpenable, pip_services3_commons.run.IClosable.IClosable

Interface for asynchronous message queues.

Not all queues may implement all the methods. Attempt to call non-supported method will result in NotImplemented exception. To verify if specific method is supported consult with MessagingCapabilities.

See MessagingCapabilities, MessageEnvelope

abandon(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

Parameters

message – a message to return.

begin_listen(correlation_id: Optional[str], receiver: <module 'pip_services3_messaging.queues.IMessageReceiver' from '/pip_services3_messaging/queues/IMessageReceiver.py'>)

Listens for incoming messages without blocking the current thread.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • receiver – a receiver to receive incoming messages.

complete(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

Parameters

message – a message to remove.

end_listen(correlation_id: Optional[str])

Ends listening for incoming messages. When this method is call listen() unblocks the thread and execution continues.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

get_capabilities() → <module ‘pip_services3_messaging.queues.MessagingCapabilities’ from ‘/pip_services3_messaging/queues/MessagingCapabilities.py’>

Gets the queue capabilities

Returns

the queue’s capabilities object.

get_name()str

Gets the queue name

Returns

the queue name.

listen(correlation_id: Optional[str], receiver: <module 'pip_services3_messaging.queues.IMessageReceiver' from '/pip_services3_messaging/queues/IMessageReceiver.py'>)

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • receiver – a receiver to receive incoming messages.

move_to_dead_letter(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue and sends it to dead letter queue.

Parameters

message – a message to be removed.

peek(correlation_id: Optional[str])pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

Returns

a message object.

peek_batch(correlation_id: Optional[str], message_count: int) → List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • message_count – a maximum number of messages to peek.

Returns

a list of message objects.

read_message_count()int

Reads the current number of messages in the queue to be delivered.

Returns

a number of messages

receive(correlation_id: Optional[str], wait_timeout: int)pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope

Receives an incoming message and removes it from the queue.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • wait_timeout – a timeout in milliseconds to wait for a message to come.

Returns

a message object.

renew_lock(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

Parameters
  • message – a message to extend its lock.

  • lock_timeout – a locking timeout in milliseconds.

send(correlation_id: Optional[str], envelop: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Sends a message into the queue.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • envelop – a message envelop to be sent.

send_as_object(correlation_id: Optional[str], message_type: str, message: Any)

Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a MessageEnvelope.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • message_type – a message type

  • message – an object value to be sent

class pip_services3_messaging.queues.IMessageReceiver

Bases: abc.ABC

Callback interface to receive incoming messages.

Example:

class MyMessageReceiver(IMessageReceiver):
    def receive_message(self, envelop, queue):
        print "Received message: " + envelop.get_message_as_string()

messageQueue = MemoryMessageQueue()
messageQueue.listen("123", MyMessageReceiver())

messageQueue.open("123")
messageQueue.send("123", MessageEnvelope(None, "mymessage", "ABC")) # Output in console: "ABC"
receive_message(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, queue: <module 'pip_services3_messaging.queues.IMessageQueue' from '/pip_services3_messaging/queues/IMessageQueue.py'>)

Receives incoming message from the queue. :param message: an incoming message :param queue: a queue where the message comes from

See MessageEnvelope, IMessageQueue

class pip_services3_messaging.queues.MemoryMessageQueue(name: str = None)

Bases: pip_services3_messaging.queues.MessageQueue.MessageQueue, pip_services3_commons.run.ICleanable.ICleanable

Message queue that sends and receives messages within the same process by using shared memory. This queue is typically used for testing to mock real queues.

### Configuration parameters ###
  • name: name of the message queue

### References ###
  • *:logger:*:*:1.0 (optional) ILogger components to pass log messages

  • *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements

Example:

queue = MessageQueue("myqueue")
queue.send("123", MessageEnvelope(None, "mymessage", "ABC"))

message = queue.receive("123", 0)
if message != None:
    # ...
    queue.complete("123", message)
abandon(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

Parameters

message – a message to return.

clear(correlation_id: Optional[str])

Clears component state.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

close(correlation_id: Optional[str])

Closes component and frees used resources.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

complete(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

Parameters

message – a message to remove.

configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)

Configures component by passing configuration parameters.

Parameters

config – configuration parameters to be set.

end_listen(correlation_id)

Ends listening for incoming messages. When this method is call listen() unblocks the thread and execution continues.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

is_open()

Checks if the component is opened.

Returns

true if the component has been opened and false otherwise.

listen(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • receiver – a receiver to receive incoming messages.

move_to_dead_letter(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue and sends it to dead letter queue.

Parameters

message – a message to be removed.

peek(correlation_id: Optional[str])pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

Returns

a message object.

peek_batch(correlation_id: Optional[str], message_count: int) → List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • message_count – a maximum number of messages to peek.

Returns

a list of message objects.

read_message_count()int

Reads the current number of messages in the queue to be delivered.

Returns

a number of messages

receive(correlation_id: Optional[str], wait_timeout: int) → Optional[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]

Receives an incoming message and removes it from the queue.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • wait_timeout – a timeout in milliseconds to wait for a message to come.

Returns

a message object.

renew_lock(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

Parameters
  • message – a message to extend its lock.

  • lock_timeout – a locking timeout in milliseconds.

send(correlation_id, message)

Sends a message into the queue.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • message – a message envelop to be sent.

class pip_services3_messaging.queues.MessageEnvelope(correlation_id: Optional[str], message_type: Optional[str], message: Optional[Any])

Bases: object

Allows adding additional information to messages. A correlation id, message id, and a message type are added to the data being sent/received. Additionally, a MessageEnvelope can reference a lock token.

Side note: a MessageEnvelope’s message is stored as a buffer, so strings are converted using utf8 conversions.

static from_json(value: str) → Optional[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]

Converts a JSON string into a MessageEnvelope The message payload is passed as base64 string

Parameters

value – a JSON encoded string

Returns

a decoded Message Envelop.

get_message_as() → Any

TODO add description

Returns

the value that was stored in this message as a JSON string.

get_message_as_string() → Optional[str]

TODO add description

Returns

the information stored in this message as a UTF-8 encoded string.

get_reference() → Any

Gets a lock token reference for this MessageEnvelope.

Returns

the lock token that this MessageEnvelope references.

set_message_as_object(value: Any)

Stores the given value as a object.

Parameters

value – the value to convert to JSON and store in this message.

set_message_as_string(value: str)

Stores the given string.

Parameters

value – the string to set. Will be converted to a buffer, using UTF-8 encoding.

set_reference(value: Any)

Sets a lock token reference for this MessageEnvelope.

Parameters

value – the lock token to reference.

to_json()dict

Converts this MessageEnvelope to a JSON string. The message payload is passed as base64 string

Returns

A JSON encoded representation is this object.

to_string()str

Convert’s this MessageEnvelope to a string, using the following format: `"[<correlation_id>,<message_type>,<message.toString>]"`.

If any of the values are `None`, they will be replaced with `---`.

Returns

the generated string.

class pip_services3_messaging.queues.MessageQueue(name: str = None, capabilities: pip_services3_messaging.queues.MessagingCapabilities.MessagingCapabilities = None)

Bases: pip_services3_commons.config.IConfigurable.IConfigurable, pip_services3_commons.refer.IReferenceable.IReferenceable, pip_services3_messaging.queues.IMessageQueue.IMessageQueue

Abstract message queue.

Abstract message queue that is used as a basis for specific message queue implementations.

### Configuration parameters ###
  • name: name of the message queue

  • connection(s):
    • discovery_key: key to retrieve parameters from discovery service

    • protocol: connection protocol like http, https, tcp, udp

    • host: host name or IP address

    • port: port number

    • uri: resource URI or connection string with all parameters in it

  • credential(s):

  • store_key: key to retrieve parameters from credential store

  • username: user name

  • password: user password

  • access_id: application access id

  • access_key: application secret key

### References ###
  • *:logger:*:*:1.0 (optional) ILogger components to pass log messages

  • *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements

  • *:discovery:*:*:1.0 (optional) IDiscovery components to discover connection(s)

  • *:credential-store:*:*:1.0 (optional) ICredentialStore componetns to lookup credential(s)

abstract abandon(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

Parameters

message – a message to return.

begin_listen(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)

Listens for incoming messages without blocking the current thread.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • receiver – a receiver to receive incoming messages.

abstract clear(correlation_id: Optional[str])

Clears component state.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

abstract close(correlation_id: Optional[str])

Closes component and frees used resources.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

abstract complete(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing. :param message: a message to remove.

configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)

Configures component by passing configuration parameters.

Parameters

config – configuration parameters to be set.

abstract end_listen(correlation_id: Optional[str])

Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

get_capabilities()pip_services3_messaging.queues.MessagingCapabilities.MessagingCapabilities

Gets the queue capabilities

Returns

the queue’s capabilities object.

get_name()str

Gets the queue name

Returns

the queue name.

abstract is_open()bool

Checks if the component is opened.

Returns

true if the component has been opened and false otherwise.

abstract listen(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • receiver – a receiver to receive incoming messages.

abstract move_to_dead_letter(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue and sends it to dead letter queue.

Parameters

message – a message to be removed.

open(correlation_id: Optional[str])

Opens the component.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

abstract peek(correlation_id: Optional[str])pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns None.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

Returns

a peeked message or None.

abstract peek_batch(correlation_id: Optional[str], message_count: int) → List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • message_count – a maximum number of messages to peek.

Returns

a list of peeked messages

abstract read_message_count()int

Reads the current number of messages in the queue to be delivered.

Returns

a number of messages in the queue.

abstract receive(correlation_id: Optional[str], wait_timeout: int)pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope

Receives an incoming message and removes it from the queue.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • wait_timeout – a timeout in milliseconds to wait for a message to come.

Returns

a received message or None.

abstract renew_lock(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

Parameters
  • message – a message to extend its lock.

  • lock_timeout – a locking timeout in milliseconds.

abstract send(correlation_id: Optional[str], envelop: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Sends a message into the queue. :param correlation_id: (optional) transaction id to trace execution through call chain. :param envelop: a message envelop to be sent.

send_as_object(correlation_id: Optional[str], message_type: str, message: Any)

Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a MessageEnvelope.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • message_type – a message type

  • message – an object value to be sent

set_references(references: pip_services3_commons.refer.IReferences.IReferences)

Sets references to dependent components.

Parameters

references – references to locate the component dependencies.

to_string()str

Gets a string representation of the object.

Returns

a string representation of the object.

class pip_services3_messaging.queues.MessagingCapabilities(message_count: bool, send: bool, receive: bool, peek: bool, peek_batch: bool, renew_lock: bool, abandon: bool, dead_letter: bool, clear: bool)

Bases: object

Data object that contains supported capabilities of a message queue. If certain capability is not supported a queue will throw NotImplemented exception.

can_abandon()bool

Informs if the queue is able to abandon messages.

Returns

true if queue is able to abandon.

can_clear()bool

Informs if the queue can be cleared.

Returns

true if queue can be cleared.

can_dead_letter()bool

Informs if the queue is able to send messages to dead letter queue.

Returns

true if queue is able to send messages to dead letter queue.

can_message_count()bool

Informs if the queue is able to read number of messages.

Returns

true if queue supports reading message count.

can_peek()bool

Informs if the queue is able to peek messages.

Returns

true if queue is able to peek messages.

can_peek_batch()bool

Informs if the queue is able to peek multiple messages in one batch.

Returns

true if queue is able to peek multiple messages in one batch.

can_receive()bool

Informs if the queue is able to receive messages.

Returns

true if queue is able to receive messages.

can_renew_lock()bool

Informs if the queue is able to renew message lock.

Returns

true if queue is able to renew message lock.

can_send()bool

Informs if the queue is able to send messages.

Returns

true if queue is able to send messages.