pip_services3_kafka.connect package
Submodules
Module contents
- class pip_services3_kafka.connect.IKafkaMessageListener
Bases:
abc.ABC
The IKafkaMessageListener interface defines a Kafka message listener.
- abstract on_message(topic: str, partition: int, message: cimpl.Message)
Defines the actions to be done after a message is received.
- Parameters
topic – topic
partition – partition
message – message
- class pip_services3_kafka.connect.KafkaConnection
Bases:
pip_services3_messaging.connect.IMessageQueueConnection.IMessageQueueConnection
,pip_services3_commons.refer.IReferenceable.IReferenceable
,pip_services3_commons.config.IConfigurable.IConfigurable
,pip_services3_commons.run.IOpenable.IOpenable
Kafka connection using plain driver.
By defining a connection and sharing it through multiple message queues you can reduce number of used database connections.
- ### Configuration parameters ###
client_id: (optional) name of the client id
- connection(s):
discovery_key: (optional) a key to retrieve the connection from
IDiscovery
host: host name or IP address
port: port number (default: 27017)
uri: resource URI or connection string with all parameters in it
- credential(s):
store_key: (optional) a key to retrieve the credentials from
ICredentialStore
username: user name
password: user password
- options:
log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
max_retries: (optional) maximum retry attempts (default: 5)
retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
- ### References ###
*:logger:*:*:1.0 (optional)
ILogger
components to pass log messages*:discovery:*:*:1.0 (optional)
IDiscovery
services to resolve connection*:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
- close(correlation_id: Optional[str])
Closes component and frees used resources.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- commit(topic: str, group_id: str, partition: int, offset: int, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)
Commit a message offset.
- Parameters
topic – a topic name
group_id – (optional) a consumer group id
partition – a partition number
offset – a message offset
listener – a message listener
- configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)
Configures component by passing configuration parameters.
- Parameters
config – configuration parameters to be set.
- create_queue(name: str)
Creates a message queue. If connection doesn’t support this function it exists without error.
- Parameters
name – the name of the queue to be created.
- delete_queue(name: str)
Deletes a message queue. If connection doesn’t support this function it exists without error.
- Parameters
name – the name of the queue to be deleted.
- get_connection() Any
Gets the connection.
- get_producer() cimpl.Producer
Gets the Kafka message producer object
- is_open() bool
Checks if the component is opened.
- Returns
true if the component has been opened and false otherwise.
- open(correlation_id: Optional[str])
Opens the component.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- publish(topic: str, messages: List[dict], options: dict)
Publish a message to a specified topic
- Parameters
topic – a topic where the message will be placed
messages – a list of messages to be published
options – publishing options
- read_queue_names() List[str]
Reads a list of registered queue names. If connection doesn’t support this function returnes an empty list. :return: queue names.
- seek(topic: str, group_id: str, partition: int, offset: int, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)
Seek a message offset.
- Parameters
topic – a topic name
group_id – (optional) a consumer group id
partition – a partition number
offset – a message offset
listener – a message listener
- set_references(references: pip_services3_commons.refer.IReferences.IReferences)
Sets references to dependent components.
- Parameters
references – references to locate the component dependencies.
- subscribe(topic: str, group_id: str, options: dict, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)
Subscribes to a topic
- Parameters
topic – subject(topic) name
group_id – (optional) consumer group id
options – subscription options
listener – message listener
- unsubscribe(topic: str, group_id: str, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)
Unsubscribe from a previously subscribed topic
- Parameters
topic – a topic name
group_id – (optional) a consumer group id
listener – a message listener
- class pip_services3_kafka.connect.KafkaConnectionResolver
Bases:
pip_services3_commons.refer.IReferenceable.IReferenceable
,pip_services3_commons.config.IConfigurable.IConfigurable
Helper class that resolves Kafka connection and credential parameters, validates them and generates connection options.
- ### Configuration parameters ###
connection(s): - discovery_key: (optional) a key to retrieve the connection from
IDiscovery
- host: host name or IP address - port: port number - uri: resource URI or connection string with all parameters in itcredential(s): - store_key: (optional) a key to retrieve the credentials from
ICredentialStore
- username: user name - password: user password
- ### References ###
*:discovery:*:*:1.0 (optional)
IDiscovery
services to resolve connection*:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
- compose(correlation_id: Optional[str], connections: List[pip_services3_components.connect.ConnectionParams.ConnectionParams], credential: pip_services3_components.auth.CredentialParams.CredentialParams) Any
Composes Kafka connection options from connection and credential parameters.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
connections – connection parameters
credential – credential parameters
- Returns
resolved Kafka connection options.
- configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)
Configures component by passing configuration parameters.
- Parameters
config – configuration parameters to be set.
- resolve(correlation_id: Optional[str]) pip_services3_commons.config.ConfigParams.ConfigParams
Resolves Kafka connection options from connection and credential parameters.
- Parameters
correlation_id – (optional) transaction id to trace execution through call chain.
- Returns
resolved Kafka connection options.
- set_references(references: pip_services3_commons.refer.IReferences.IReferences)
Sets references to dependent components.
- Parameters
references – references to locate the component dependencies.
- class pip_services3_kafka.connect.KafkaSubscription(topic: Optional[str] = None, group_id: Optional[str] = None, options: Optional[Any] = None, handler: Optional[cimpl.Consumer] = None, listener: Optional[pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener] = None)
Bases:
object
The KafkaSubscription class defines fields for Kafka subscriptions