pip_services3_kafka.queues package

Submodules

Module contents

class pip_services3_kafka.queues.KafkaMessageQueue(name: Optional[str] = None)

Bases: pip_services3_messaging.queues.MessageQueue.MessageQueue, pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener, pip_services3_commons.refer.IUnreferenceable.IUnreferenceable, pip_services3_commons.run.IOpenable.IOpenable, pip_services3_commons.run.ICleanable.ICleanable

Message queue that sends and receives messages via Kafka message broker.

Kafka is a popular light-weight protocol to communicate IoT devices.

### Configuration parameters ###
  • topic: name of Kafka topic to subscribe

  • group_id: (optional) consumer group id (default: default)

  • from_beginning: (optional) restarts receiving messages from the beginning (default: false)

  • read_partitions: (optional) number of partitions to be consumed concurrently (default: 1)

  • autocommit: (optional) turns on/off autocommit (default: true)

  • connection(s): - discovery_key: (optional) a key to retrieve the connection from IDiscovery - host: host name or IP address - port: port number - uri: resource URI or connection string with all parameters in it

  • credential(s): - store_key: (optional) a key to retrieve the credentials from ICredentialStore - username: user name - password: user password

  • options: - autosubscribe: (optional) true to automatically subscribe on option (default: false) - acks (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1) - log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1) - connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000) - max_retries: (optional) maximum retry attempts (default: 5) - retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000) - request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)

### References ###
  • *:logger:*:*:1.0 (optional) ILogger components to pass log messages

  • *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements

  • *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connection

  • *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials

  • *:connection:kafka:*:1.0 (optional) Shared connection to Kafka service

Example:

queue = KafkaMessageQueue("myqueue")
queue.configure(ConfigParams.from_tuples(
    "topic", "mytopic",
    'connection.protocol', 'tcp',
    "connection.host", "localhost",
    "connection.port", 9092,
))

queue.open("123")
queue.send("123", MessageEnvelope(None, "mymessage", "ABC"))
message = queue.receive("123", 10000)

print(message)

if message is not None:
    ...
    queue.complete(message)
abandon(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

Parameters

message – a message to return.

clear(correlation_id: Optional[str])

Clears component state.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

close(correlation_id: Optional[str])

Closes component and frees used resources.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

complete(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

Parameters

message – a message to remove.

configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)

Configures object by passing configuration parameters.

Parameters

config – configuration parameters to be set.

end_listen(correlation_id: Optional[str])

Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

is_open() bool

Checks if the component is opened.

Returns

true if the component has been opened and false otherwise.

listen(correlation_id: Optional[str], receiver: pip_services3_messaging.queues.IMessageReceiver.IMessageReceiver)

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • receiver – a receiver to receive incoming messages.

move_to_dead_letter(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Permanently removes a message from the queue and sends it to dead letter queue.

Important: This method is not supported by Kafka.

Parameters

message – a message to be removed.

on_message(topic: str, partition: int, message: cimpl.Message)

Defines the actions to be done after a message is received.

Parameters
  • topic – topic

  • partition – partition

  • message – message

open(correlation_id: Optional[str])

Opens the component.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

peek(correlation_id: Optional[str]) pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

Returns

a peeked message.

peek_batch(correlation_id: Optional[str], message_count: int) List[pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope]

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • message_count – a maximum number of messages to peek.

Returns

a list with peeked messages.

read_message_count() int

Reads the current number of messages in the queue to be delivered.

Returns

a number of messages in the queue.

receive(correlation_id: Optional[str], wait_timeout: int) pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope

Receives an incoming message and removes it from the queue.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • wait_timeout – a timeout in milliseconds to wait for a message to come.

Returns

a received message.

renew_lock(message: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope, lock_timeout: int)

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

Important: This method is not supported by Kafka.

Parameters
  • message – a message to extend its lock.

  • lock_timeout – a locking timeout in milliseconds.

send(correlation_id: Optional[str], envelop: pip_services3_messaging.queues.MessageEnvelope.MessageEnvelope)

Sends a message into the queue.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • envelop – a message envelop to be sent.

set_references(references: pip_services3_commons.refer.IReferences.IReferences)

Sets references to dependent components.

Parameters

references – references to locate the component dependencies.

unset_references()

Unsets (clears) previously set references to dependent components.