pip_services3_kafka.queues package
Submodules
Module contents
- class pip_services3_kafka.queues.KafkaMessageQueue(name: Optional[str] = None)
Bases:
pip_services3_messaging.queues.MessageQueue.MessageQueue
,pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener
,pip_services3_commons.refer.IUnreferenceable.IUnreferenceable
,pip_services3_commons.run.IOpenable.IOpenable
,pip_services3_commons.run.ICleanable.ICleanable
Message queue that sends and receives messages via Kafka message broker.
Kafka is a popular light-weight protocol to communicate IoT devices.
- ### Configuration parameters ###
topic: name of Kafka topic to subscribe
group_id: (optional) consumer group id (default: default)
from_beginning: (optional) restarts receiving messages from the beginning (default: false)
read_partitions: (optional) number of partitions to be consumed concurrently (default: 1)
autocommit: (optional) turns on/off autocommit (default: true)
connection(s): - discovery_key: (optional) a key to retrieve the connection from
IDiscovery
- host: host name or IP address - port: port number - uri: resource URI or connection string with all parameters in itcredential(s): - store_key: (optional) a key to retrieve the credentials from
ICredentialStore
- username: user name - password: user passwordoptions: - autosubscribe: (optional) true to automatically subscribe on option (default: false) - acks (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1) - log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1) - connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000) - max_retries: (optional) maximum retry attempts (default: 5) - retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000) - request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
- ### References ###
*:logger:*:*:1.0 (optional)
ILogger
components to pass log messages*:counters:*:*:1.0 (optional)
ICounters
components to pass collected measurements*:discovery:*:*:1.0 (optional)
IDiscovery
services to resolve connection*:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
*:connection:kafka:*:1.0 (optional) Shared connection to Kafka service
Example:
queue = KafkaMessageQueue("myqueue") queue.configure(ConfigParams.from_tuples( "topic", "mytopic", 'connection.protocol', 'tcp', "connection.host", "localhost", "connection.port", 9092, )) queue.open("123") queue.send("123", MessageEnvelope(None, "mymessage", "ABC")) message = queue.receive("123", 10000) print(message) if message is not None: ... queue.complete(message)
- abandon(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)
Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.
- Parameters
message – a message to return.
- clear(correlation_id: Optional[str])
Clears component state.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- close(correlation_id: Optional[str])
Closes component and frees used resources.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- complete(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)
Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.
- Parameters
message – a message to remove.
- configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)
Configures object by passing configuration parameters.
- Parameters
config – configuration parameters to be set.
- end_listen(correlation_id: Optional[str])
Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- is_open() bool
Checks if the component is opened.
- Returns
true if the component has been opened and false otherwise.
- listen(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)
Listens for incoming messages and blocks the current thread until queue is closed.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
receiver – a receiver to receive incoming messages.
- move_to_dead_letter(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)
Permanently removes a message from the queue and sends it to dead letter queue.
Important: This method is not supported by Kafka.
- Parameters
message – a message to be removed.
- on_message(topic: str, partition: int, message: cimpl.Message)
Defines the actions to be done after a message is received.
- Parameters
topic – topic
partition – partition
message – message
- open(correlation_id: Optional[str])
Opens the component.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- peek(correlation_id: Optional[str]) pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope
Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- Returns
a peeked message.
- peek_batch(correlation_id: Optional[str], message_count: int) List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]
Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
message_count – a maximum number of messages to peek.
- Returns
a list with peeked messages.
- read_message_count() int
Reads the current number of messages in the queue to be delivered.
- Returns
a number of messages in the queue.
- receive(correlation_id: Optional[str], wait_timeout: int) pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope
Receives an incoming message and removes it from the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
wait_timeout – a timeout in milliseconds to wait for a message to come.
- Returns
a received message.
- renew_lock(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)
Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
Important: This method is not supported by Kafka.
- Parameters
message – a message to extend its lock.
lock_timeout – a locking timeout in milliseconds.
- send(correlation_id: Optional[str], envelop: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)
Sends a message into the queue.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
envelop – a message envelop to be sent.
- set_references(references: pip_services3_commons.refer.IReferences.IReferences)
Sets references to dependent components.
- Parameters
references – references to locate the component dependencies.
- unset_references()
Unsets (clears) previously set references to dependent components.