pip_services3_kafka.connect package

Submodules

Module contents

class pip_services3_kafka.connect.IKafkaMessageListener

Bases: abc.ABC

The IKafkaMessageListener interface defines a Kafka message listener.

abstract on_message(topic: str, partition: int, message: cimpl.Message)

Defines the actions to be done after a message is received.

Parameters
  • topic – topic

  • partition – partition

  • message – message

class pip_services3_kafka.connect.KafkaConnection

Bases: pip_services3_messaging.connect.IMessageQueueConnection.IMessageQueueConnection, pip_services3_commons.refer.IReferenceable.IReferenceable, pip_services3_commons.config.IConfigurable.IConfigurable, pip_services3_commons.run.IOpenable.IOpenable

Kafka connection using plain driver.

By defining a connection and sharing it through multiple message queues you can reduce number of used database connections.

### Configuration parameters ###
  • client_id: (optional) name of the client id

  • connection(s):
    • discovery_key: (optional) a key to retrieve the connection from IDiscovery

    • host: host name or IP address

    • port: port number (default: 27017)

    • uri: resource URI or connection string with all parameters in it

  • credential(s):
    • store_key: (optional) a key to retrieve the credentials from ICredentialStore

    • username: user name

    • password: user password

  • options:
    • log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)

    • connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)

    • max_retries: (optional) maximum retry attempts (default: 5)

    • retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)

    • request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)

### References ###
  • *:logger:*:*:1.0 (optional) ILogger components to pass log messages

  • *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connection

  • *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials

close(correlation_id: Optional[str])

Closes component and frees used resources.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

commit(topic: str, group_id: str, partition: int, offset: int, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)

Commit a message offset.

Parameters
  • topic – a topic name

  • group_id – (optional) a consumer group id

  • partition – a partition number

  • offset – a message offset

  • listener – a message listener

configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)

Configures component by passing configuration parameters.

Parameters

config – configuration parameters to be set.

create_queue(name: str)

Creates a message queue. If connection doesn’t support this function it exists without error.

Parameters

name – the name of the queue to be created.

delete_queue(name: str)

Deletes a message queue. If connection doesn’t support this function it exists without error.

Parameters

name – the name of the queue to be deleted.

get_connection() Any

Gets the connection.

get_producer() cimpl.Producer

Gets the Kafka message producer object

is_open() bool

Checks if the component is opened.

Returns

true if the component has been opened and false otherwise.

open(correlation_id: Optional[str])

Opens the component.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

publish(topic: str, messages: List[dict], options: dict)

Publish a message to a specified topic

Parameters
  • topic – a topic where the message will be placed

  • messages – a list of messages to be published

  • options – publishing options

read_queue_names() List[str]

Reads a list of registered queue names. If connection doesn’t support this function returnes an empty list. :return: queue names.

seek(topic: str, group_id: str, partition: int, offset: int, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)

Seek a message offset.

Parameters
  • topic – a topic name

  • group_id – (optional) a consumer group id

  • partition – a partition number

  • offset – a message offset

  • listener – a message listener

set_references(references: pip_services3_commons.refer.IReferences.IReferences)

Sets references to dependent components.

Parameters

references – references to locate the component dependencies.

subscribe(topic: str, group_id: str, options: dict, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)

Subscribes to a topic

Parameters
  • topic – subject(topic) name

  • group_id – (optional) consumer group id

  • options – subscription options

  • listener – message listener

unsubscribe(topic: str, group_id: str, listener: pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener)

Unsubscribe from a previously subscribed topic

Parameters
  • topic – a topic name

  • group_id – (optional) a consumer group id

  • listener – a message listener

class pip_services3_kafka.connect.KafkaConnectionResolver

Bases: pip_services3_commons.refer.IReferenceable.IReferenceable, pip_services3_commons.config.IConfigurable.IConfigurable

Helper class that resolves Kafka connection and credential parameters, validates them and generates connection options.

### Configuration parameters ###
  • connection(s): - discovery_key: (optional) a key to retrieve the connection from IDiscovery - host: host name or IP address - port: port number - uri: resource URI or connection string with all parameters in it

  • credential(s): - store_key: (optional) a key to retrieve the credentials from ICredentialStore - username: user name - password: user password

### References ###
  • *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connection

  • *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials

compose(correlation_id: Optional[str], connections: List[pip_services3_components.connect.ConnectionParams.ConnectionParams], credential: pip_services3_components.auth.CredentialParams.CredentialParams) Any

Composes Kafka connection options from connection and credential parameters.

Parameters
  • correlation_id – (optional) transaction id to trace execution through call chain.

  • connections – connection parameters

  • credential – credential parameters

Returns

resolved Kafka connection options.

configure(config: pip_services3_commons.config.ConfigParams.ConfigParams)

Configures component by passing configuration parameters.

Parameters

config – configuration parameters to be set.

resolve(correlation_id: Optional[str]) pip_services3_commons.config.ConfigParams.ConfigParams

Resolves Kafka connection options from connection and credential parameters.

Parameters

correlation_id – (optional) transaction id to trace execution through call chain.

Returns

resolved Kafka connection options.

set_references(references: pip_services3_commons.refer.IReferences.IReferences)

Sets references to dependent components.

Parameters

references – references to locate the component dependencies.

class pip_services3_kafka.connect.KafkaSubscription(topic: Optional[str] = None, group_id: Optional[str] = None, options: Optional[Any] = None, handler: Optional[cimpl.Consumer] = None, listener: Optional[pip_services3_kafka.connect.IKafkaMessageListener.IKafkaMessageListener] = None)

Bases: object

The KafkaSubscription class defines fields for Kafka subscriptions