Kafka integration for IBM Streams topology applications

Overview

This module allows a Streams application to subscribe Kafka topics as a stream and to publish messages on Kafka topics from a stream of tuples. To achieve this, this module provides the KafkaConsumer and KafkaProducer classes. The KafkaConsumer is the source of a stream, and the KafkaProducer acts as a Sink of a data stream.

Connection to a Kafka broker

Kafka consumers and producers are configured by using a consumer or producer configuration. Apart from the connection information, like hostnames, TCP ports, and security settings, these configurations can contain also settings that affect the behaviour of the consumer or producer client. That’s why these configurations can be specific for consumers and producers. In almost all cases it is sufficient to configure only the connection specific information.

Kafka configurations can be stored in application configurations, which are named collections of key-value pairs. The config parameter of the KafkaConsumer and KafkaProducer classes can take a str type argument - in this case the name of an application configuration is assumed -, or a dict type argument. In this case it must be a valid consumer or producer configuration.

The minimum set of properties in the application configuration or dictionary contains bootstrap.servers, for example

config

value

bootstrap.servers

host1:port1,host2:port2,host3:port3

Other configs for Kafka consumers or Kafka producers can be added to the application configuration or dictionary. When configurations are specified, which are specific for consumers or producers, it is recommended to use different application configurations or dict type variables for KafkaConsumer and KafkaProducer.

The consumer and producer configs can be found in the Kafka documentation.

Please note, that the underlying SPL toolkit already adjusts several configurations. Please review the toolkit operator reference for defaults and adjusted configurations.

Connection with the IBM Event Streams cloud service

The IBM Event Streams cloud service is a fully managed Kafka service. To connect with it, service credentials must be used. The function create_connection_properties_for_eventstreams() creates the Kafka configuration from the service credentials:

# assume, the service credentials are stored in the file /tmp/eventstreams.json
consumer = KafkaConsumer(
    config=create_connection_properties_for_eventstreams('/tmp/eventstreams.json'),
    topic='MY_TOPIC',
    schema=CommonSchema.Json)
topology = Topology()
kafka_stream = topology.source(consumer)

Connection examples

Simple connection parameter example:

from streamsx.kafka import KafkaConsumer
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema

consumerProperties = dict()
consumerProperties['bootstrap.servers'] = 'kafka-host1.domain:9092,kafka-host2.domain:9092'
consumerProperties['fetch.min.bytes'] = 1024
consumerProperties['max.partition.fetch.bytes'] = 4194304

consumer = KafkaConsumer(config=consumerProperties,
                         topic='Your_Topic',
                         schema=CommonSchema.String)
topology = Topology()
fromKafka = topology.source(consumer)

When trusted certificates, or client certificates, and private keys are required to connect with a Kafka cluster, the function create_connection_properties helps to create keystores for certificates and keys, and to create the right properties.

In IBM Cloud Pak for Data it is also possible to create application configurations for consumer and producer properties. An application configuration is a safe place to store sensitive data. Use the function configure_connection_from_properties to create an application configuration for Kafka properties.

Example with use of an application configuration for use on IBM Cloud Pak for Data:

from icpd_core import icpd_util

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.rest import Instance
import streamsx.topology.context

import streamsx.kafka as kafka

topology = Topology('ConsumeFromKafka')

connection_properties = kafka.create_connection_properties(
    bootstrap_servers='kafka-bootstrap.192.168.42.183.nip.io:443',
    #use_TLS=True,
    #enable_hostname_verification=True,
    cluster_ca_cert='/tmp/secrets/cluster_ca_cert.pem',
    authentication = kafka.AuthMethod.SCRAM_SHA_512,
    username = 'user123',
    password = 'passw0rd', # not the very best choice for a password
    topology = topology)

consumer_properties = dict()
# In this example we read only transactionally committed messages
consumer_properties['isolation.level'] = 'read_committed'
# add connection specifc properties to the consumer properties
consumer_properties.update(connection_properties)
# get the streams instance in IBM Cloud Pak for Data
instance_cfg = icpd_util.get_service_instance_details(name='instanceName')
instance_cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
streams_instance = Instance.of_service(instance_cfg)

# create the application configuration
appconfig_name = kafka.configure_connection_from_properties(
    instance=streams_instance,
    name='kafkaConsumerProps',
    properties=consumer_properties,
    description='Consumer properties for authenticated access')

consumer = kafka.KafkaConsumer(config=appconfig_name,
                               topic='mytopic',
                               schema=CommonSchema.String)

topology = Topology()
fromKafka = topology.source(consumer)

Messages

The schema of the stream defines how messages are handled.

  • CommonSchema.String - Each message is a UTF-8 encoded string. No key is used.

  • CommonSchema.Json - Each message is a UTF-8 encoded serialized JSON object. No key is used.

  • CommonSchema.Binary - Each message is a blob. No key is used.

  • StringMessage - structured schema with message and key

  • BinaryMessage - structured schema with message and key

  • StringMessageMeta - structured schema with message, key, and message meta data

  • BinaryMessageMeta - structured schema with message, key, and message meta data

When other schemas are used, the attribute names for the message and the message key must be specified when they have not the default names message or key, respectively. When message metadata (topic, partition, message timestamp, and offset) is to be received, the names of the attributes must be specified when they have not the default names topic, partition, messageTimestamp, or offset. Receiving message metadata is optional.

Sample

A simple hello world example of a Streams application publishing to a topic and the same application consuming the same topic:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
from streamsx.kafka import KafkaConsumer, KafkaProducer
import time

def delay(v):
    time.sleep(5.0)
    return True

topology = Topology('KafkaHelloWorld')

to_kafka = topology.source(['Hello', 'World!'])
to_kafka = to_kafka.as_string()
# delay tuple by tuple
to_kafka = to_kafka.filter(delay)

# Publish a stream to Kafka using TEST topic, the Kafka server is at localhost
producer = KafkaProducer({'bootstrap.servers': 'localhost:9092'}, 'TEST')
to_kafka.for_each(producer)

# Subscribe to same topic as a stream
consumer = KafkaConsumer({'bootstrap.servers': 'localhost:9092'}, 'TEST', CommonSchema.String)
from_kafka = topology.source(consumer)

# You'll find the Hello World! in stdout log file:
from_kafka.print()

submit(ContextTypes.DISTRIBUTED, topology)
# The Streams job is kept running.
class streamsx.kafka.AuthMethod

Bases: enum.Enum

Defines authentication methods for Kafka.

NONE = 0

No authentication

New in version 1.3.

PLAIN = 2

PLAIN, or SASL/PLAIN, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. SASL/PLAIN should only be used with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.

New in version 1.3.

SCRAM_SHA_512 = 3

Authentication with SASL/SCRAM-SHA-512 method. A username and a password is required.

New in version 1.3.

TLS = 1

Mutual TLS authentication with X.509 certificates. Client certificate and client private key is required

New in version 1.3.

class streamsx.kafka.KafkaConsumer(config, topic, schema, message_attribute_name=None, key_attribute_name=None, topic_attribute_name=None, partition_attribute_name=None, timestamp_attribute_name=None, offset_attribute_name=None, **options)

Bases: streamsx.kafka._kafka._KafkaComposite, streamsx.topology.composite.Source

Represents a source for messages read from Kafka, which can be passed to Topology.source() to create a stream.

A KafkaConsumer subscribes to one or more topics and can build a consumer group by setting group_size to a value greater than one. In this case, the KafkaConsumer is the begin of a parallel region.

The KafkaConsumer can also be the begin of a periodic consistent region:

from streamsx.topology.state import ConsistentRegionConfig
from streamsx.topology.topology import Topology
from streamsx.kafka.schema import Schema

consumer = KafkaConsumer(config={'bootstrap.servers': 'kafka-server.domain:9092'},
                         topic='myTopic',
                         schema=Schema.StringMessageMeta,
                         group_size = 3,
                         group_id = "my-consumer-group")

topology = Topology("KafkaConsumer")
from_kafka = topology.source(consumer, "SourceName").set_consistent(ConsistentRegionConfig.periodic(period=60))

Operator driven consistent region is not supported by the KafkaConsumer.

The topic and the group_id can also be submission time parameters. A submission parameter is a handle for a value that is not defined until topology submission time. Submission parameters enable the creation of reusable topology bundles:

from streamsx.topology.topology import Topology
from streamsx.kafka.schema import Schema

consumer = KafkaConsumer(config={'bootstrap.servers': 'kafka-server.domain:9092'},
                         topic=KafkaConsumer.submission_parameter("topic"),
                         schema=Schema.StringMessageMeta)

topology = Topology("KafkaConsumer")
from_kafka = topology.source(consumer, "SourceName")
Parameters
  • config (str|dict) – The name of an application configuration (str) with consumer configs or a dict with consumer configs

  • topic (str|list|streamsx.spl.op.Expression) – Single topic or list of topics to subscribe messages from

  • schema (StreamSchema) –

    Schema for the output stream

    Valid schemas are:

    • CommonSchema.String - pre-defined, each message is a UTF-8 encoded string.

    • CommonSchema.Json - pre-defined, each message is a UTF-8 encoded serialized JSON object.

    • CommonSchema.Binary - pre-defined, each message is binary object (byte array).

    • StringMessage - pre-defined, structured schema with message and key

    • BinaryMessage - pre-defined, structured schema with message and key

    • StringMessageMeta - pre-defined, structured schema with message, key, and message meta data

    • BinaryMessageMeta - pre-defined, structured schema with message, key, and message meta data

    • User defined schemas. When user defined schemas are used, the attributes names for message and key must be given if they differ from the defaults message and key. Receiving the key is optional.

  • message_attribute_name (str) – The attribute name in the stream that receives the message content of the Kafka message. Required for user-defined schema when the attribute name is different from message.

  • key_attribute_name (str) – The attribute name in the stream that receives the key of the Kafka message. Required for user-defined schema when the attribute name is different from key.

  • topic_attribute_name (str) – The attribute name in the stream that receives the topic of the Kafka message. Required for user-defined schema when the attribute name is different from topic. The attribute in the schema is optional.

  • partition_attribute_name (str) – The attribute name in the stream that receives the partition number of the Kafka message. Required for user-defined schema when the attribute name is different from partition. The attribute in the schema must have the type int and is optional in the schema.

  • timestamp_attribute_name (str) – The attribute name in the stream that receives the timestamp of the Kafka message. Required for user-defined schema when the attribute name is different from messageTimestamp. The attribute in the schema must have the type int and is optional in the schema. The message timestamp is measured in milliseconds since Unix Epoch.

  • offset_attribute_name (str) – The attribute name in the stream that receives the offset of the Kafka message. Required for user-defined schema when the attribute name is different from offset. The attribute in the schema must have the type int and is optional in the schema.

  • **options (kwargs) –

    optional arguments as keyword arguments. Following arguments are supported:

    • group_id

    • group_size

    • client_id

    • app_config_name

    • consumer_config - these configs override the configs given as the config parameter by being merged with them. This applies also for configs stored in an application configuration.

    • ssl_debug

    • vm_arg

New in version 1.8.0.

property app_config_name

The name of an application configuration with consumer configurations. The application configuration must exist when the topology is submitted.

Type

str

property client_id

An optional client identifier for Kafka. The client identifier has no function for the application, but allows to identify the Kafka client within the Kafka server.

The client identifier given here overrides the consumer config client.id if given in an application configuration or within the consumer configuration.

Note

Each instance of KafkaConsumer and KafkaProducer must have a different client identifier.

Type

str

property consumer_config

The consumer configuration

The consumer configuration must be a dict in which the keys are the consumer configs defined by Kafka. A consumer configuration can be created with create_connection_properties(). The properties given as consumer_config override the same properties in an application configuration if present.

Type

dict

property group_id

The identifier of a Kafka consumer group. This attribute goes into the group.id consumer config and overrides the same property if given as consumer_config. When no group identifier is given, a group identifier is generated from the job name and the subscribed topics.

Type

str

property group_size

The size of a Kafka consumer group, in which multiple consumers share the partitions of the subscribed topics.

With group_size greater than 1, the source stream is split into multiple channels as the start of a parallel region.

This is effectively the same as Stream.set_parallel(width=group_size).

The parallel region can be ended, for example, with Stream.end_parallel().

Type

int

populate(topology, name, **options) → streamsx.topology.topology.Stream

Populate the topology with this composite source.

Parameters
  • topology – Topology containing the source.

  • name – Name passed into source.

  • **options – Future options passed to source.

Returns

Single stream representing the source.

Return type

Stream

property ssl_debug

When True the property enables verbose SSL debug output at runtime.

Type

bool

static submission_parameter(name, default=None)

Create an expression for a submission time parameter. A submission parameter is a handle for a value that is not defined until topology submission time. Submission parameters enable the creation of reusable topology bundles.

Parameters
  • name (str) – The name of the submission time parameter.

  • default (str) – The default value to be used when the parameter is not set at submission time.

Returns

an expression to use a submission time parameter

Return type

streamsx.spl.op.Expression

property vm_arg

Arguments for the Java Virtual Machine used at Runtime, for example -Xmx2G. For multiple arguments, use a list:

consumer.vm_arg = ["-Xmx=2G", "-Xms=512M"]
Type

str|list

class streamsx.kafka.KafkaProducer(config, topic, message_attribute_name=None, key_attribute_name=None, **options)

Bases: streamsx.kafka._kafka._KafkaComposite, streamsx.topology.composite.ForEach

The KafkaProducer represents a stream termination that publishes each tuple as a message to one or more Kafka topics. Instances can be passed to Stream.for_each() to create a sink (stream termination).

Trivial example:

producer = KafkaProducer(config={'bootstrap.servers': 'kafka-server.domain:9092'},
                         topic="topic")
stream_to_publish.for_each(producer)

Example with two topics in IBM Event Streams cloud service:

eventstreams_credentials_json = "..."
producer = KafkaProducer(create_connection_properties_for_eventstreams(eventstreams_credentials_json),
                         topic=["topic1", "topic2"])
stream_to_publish.for_each(producer)

The topic can also be a submission time parameter. A submission parameter is a handle for a value that is not defined until topology submission time. Submission parameters enable the creation of reusable topology bundles:

producer = KafkaProducer(config={'bootstrap.servers': 'kafka-server.domain:9092'},
                         topic=KafkaProducer.submission_parameter("topic"))
stream_to_publish.for_each(producer)
Parameters
  • config (str|dict) – The name of an application configuration (str) with producer configs or a dict with producer configs

  • topic (str|list|streamsx.spl.op.Expression) – Topic or topics to publish messages

  • message_attribute_name (str) – the attribute name in the schema that is used as the message to publish. Required for user-defined schema when the attribute name is different from message.

  • key_attribute_name (str) – the attribute name in the schema that is used as the key for the Kafka message to publish. Required for user-defined schema when the attribute name is different from key.

  • **options (kwargs) –

    optional arguments as keyword arguments. Following arguments are supported:

    • client_id

    • app_config_name

    • producer_config - these configs override the configs given as the config parameter by being merged with them. This applies also for configs stored in an application configuration.

    • ssl_debug

    • vm_arg

New in version 1.8.0.

property app_config_name

The name of an application configuration with producer configurations. The application configuration must exist when the topology is submitted.

Type

str

property client_id

An optional client identifier for Kafka. The client identifier has no function for the application, but allows to identify the Kafka client within the Kafka server.

The client identifier given here overrides the producer config client.id if given in an application configuration or within the producer configuration.

Note

Each instance of KafkaConsumer and KafkaProducer must have a different client identifier.

Type

str

populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation.

Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

property producer_config

The producer configuration

The producer configuration must be a dict in which the keys are the producer configs defined by Kafka. A producer configuration can be created with create_connection_properties(). The properties given as producer_config override the same properties in an application configuration if present.

Type

dict

property ssl_debug

enables verbose SSL debug output at runtime when SSL connections are used.

Type

bool

static submission_parameter(name, default=None)

Create an expression for a submission time parameter. A submission parameter is a handle for a value that is not defined until topology submission time. Submission parameters enable the creation of reusable topology bundles.

Parameters
  • name (str) – The name of the submission time parameter.

  • default (str) – The default value to be used when the parameter is not set at submission time.

Returns

an expression to use a submission time parameter

Return type

streamsx.spl.op.Expression

property vm_arg

Arguments for the Java Virtual Machine used at Runtime, for example -Xmx2G. For multiple arguments, use a list:

producer.vm_arg = ["-Xmx=2G", "-Xms=512M"]
Type

str|list

streamsx.kafka.download_toolkit(url=None, target_dir=None)

Downloads the latest Kafka toolkit from GitHub.

Example for updating the Kafka toolkit for your topology with the latest toolkit from GitHub:

import streamsx.kafka as kafka
# download Kafka toolkit from GitHub
kafka_toolkit_location = kafka.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, kafka_toolkit_location)

Example for updating the topology with a specific version of the Kafka toolkit using a URL:

import streamsx.kafka as kafka
url201 = 'https://github.com/IBMStreams/streamsx.kafka/releases/download/v2.0.1/com.ibm.streamsx.kafka-2.0.1.tgz'
kafka_toolkit_location = kafka.download_toolkit(url=url201)
streamsx.spl.toolkit.add_toolkit(topology, kafka_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded Kafka toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 1.3.

streamsx.kafka.create_connection_properties(bootstrap_servers, use_TLS=True, enable_hostname_verification=True, cluster_ca_cert=None, authentication=<AuthMethod.NONE: 0>, client_cert=None, client_private_key=None, username=None, password=None, topology=None)

Create Kafka properties that can be used to connect a consumer or a producer with a Kafka cluster when certificates and keys or authentication is required. The resulting properties can be used for example in configure_connection_from_properties(), as consumer_properties in KafkaConsumer, and as producer_properties in KafkaProducer.

When certificates are given, the function will create a truststore and/or a keystore, which are added as file dependencies to the topology, which must not be None in this case.

Certificates and keys are given as strings. The arguments can be the name of an existinig PEM formatted file, which the content is read, or the PEM formatted certificate or key directly. The PEM format is a text format with base64 encoded content between anchors:

-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----

or:

-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----

Example, in which the brokers are configured for SCRAM-SHA-512 authentication over a TLS connection, and where the server certificates are not signed by a public CA. In the example, the CA certificate for the cluster is stored in the file /tmp/secrets/cluster_ca.crt:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
import streamsx.kafka as kafka

consumerTopology = Topology('ConsumeFromKafka')

consumerProperties = dict()

# In this example we read only transactionally committed messages
consumerProperties['isolation.level'] = 'read_committed'
connectionProps = kafka.create_connection_properties(
    bootstrap_servers = 'kafka-cluster1-kafka-bootstrap-myproject.192.168.42.183.nip.io:443',
    #use_TLS = True,
    #enable_hostname_verification = True,
    cluster_ca_cert = '/tmp/secrets/cluster_ca.crt',
    authentication = kafka.AuthMethod.SCRAM_SHA_512,
    username = 'user123',
    password = 'passw0rd', # not the very best choice for a password
    topology = consumerTopology)

# add connection specifc properties to the consumer properties
consumerProperties.update(connectionProps)
messages = kafka.subscribe (consumerTopology, 'mytopic', consumerProperties, CommonSchema.String)
Parameters
  • bootstrap_servers (str) – The bootstrap address of the Kafka cluster. This is a single hostname:TCPport pair or a comma separated List of hostname:TCPport, for example 'server1:9093', or 'server1:9093,server2:9093,server3:9093'.

  • use_TLS (bool) – When True (default), the client connects via encrypted connections with the Kafka brokers. In this case it may also be necessary to provide the CA certificate of the cluster within the cluster_ca_cert parameter. When the parameter is False, the traffic to the Kafka brokers is not encrypted.

  • enable_hostname_verification (bool) –

    When True (default), the hostname verification of the presented server certificate is enabled. For example, some methods to expose a containerized Kafka cluster do not support hostname verification. In these cases hostname verification must be disabled.

    The parameter is ignored when use_TLS is False.

  • cluster_ca_cert (str|list) –

    The CA certificate of the broker certificates or a list of them. This certificate is required when the cluster does not use certificates signed by a public CA authority. The parameter must be the name of an existing PEM formatted file or the PEM formatted certificate itself, or a list of filenames or PEM strings. These certificates are treated as trusted and go into a truststore.

    A trusted certificate must have a text format like this:

    -----BEGIN CERTIFICATE-----
    ...
    -----END CERTIFICATE-----
    

    The parameter is ignored when use_TLS is False.

  • authentication (AuthMethod) –

    The authentication method used by the brokers.

    • None, AuthMethod.NONE: clients are not authenticated. The parameters client_cert, client_private_key, username, and password are ignored.

    • AuthMethod.PLAIN: PLAIN, or SASL/PLAIN, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. SASL/PLAIN should only be used with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.

    • AuthMethod.TLS: clients are authorized with client certificates. This authentication method can only be used when the client uses a TLS connection, i.e. when use_TLS is True. The client certificate must be trusted by the server, and must be and given as the client_cert parameter together with the corresponding private key as the client_private_key parameter.

    • AuthMethod.SCRAM_SHA_512: SCRAM (Salted Challenge Response Authentication Mechanism) is an authentication protocol that can establish authentication using usernames and passwords. It can be used with or without a TLS client connection. This authentication method requires that the parameters username and password are used.

  • client_cert (str) –

    The client certificate, i.e. the public part of a key pair signed by an authority that is trusted by the brokers. The parameter must be the name of an existing PEM formatted file or the PEM formatted certificate itself. The client certificate must have a text format like this:

    -----BEGIN CERTIFICATE-----
    ...
    -----END CERTIFICATE-----
    

    The parameter is ignored when authentication is not ‘TLS’.

  • client_private_key (str) –

    The private part of the RSA key pair on which the client certificate is based. The parameter must be the name of an existing PEM formatted file or the PEM formatted key itself. The private key must have a text format like this:

    -----BEGIN PRIVATE KEY-----
    ...
    -----END PRIVATE KEY-----
    

    The parameter is ignored when authentication is not ‘TLS’.

  • username (str) – The username for SCRAM authentication. The parameter is ignored when authentication is not ‘SCRAM-SHA-512’.

  • password (str) – The password for SCRAM authentication. The parameter is ignored when authentication is not ‘SCRAM-SHA-512’.

  • topology (Topology) – The topology to which a truststore and or keystore as file dependencies are added. It must be this Topology instance, which the created Kafka properties are used. The parameter must not be None when one of the parameters cluster_ca_cert, client_cert, or client_private_key is not None.

Returns

Kafka properties

Return type

dict

Note

When certificates are needed, this function can be used only with the streamsx.kafka toolkit version 1.9.2 and higher. The function will add a toolkit dependency to the topology. When the toolkit dependency cannot be satisfied, use a newer toolkit version. A newer version of the toolkit can be downloaded from GitHub with download_toolkit().

Warning

The returned properties can contain sensitive data. Storing the properties in an application configuration is a good idea to avoid exposing sensitive information. On IBM Cloud Pak for Data use configure_connection_from_properties() to do this.

New in version 1.3.

streamsx.kafka.create_connection_properties_for_eventstreams(credentials)

Create Kafka configuration properties from service credentials for IBM event streams to connect with IBM event streams. The resulting properties can be used for example in configure_connection_from_properties(), as consumer_properties in KafkaConsumer, and as producer_properties in KafkaProducer.

Parameters

credentials (dict|str) – The service credentials for IBM event streams as a JSON dict or as string. When a string is given, the parameter must be the name of an existing JSON file with credentials or must contain the raw JSON credentials.

Returns

Kafka properties that contain the connection information.

Return type

dict

Warning

The returned properties contain sensitive data. Storing the properties in an application configuration is a good idea to avoid exposing sensitive information. On IBM Cloud Pak for Data use configure_connection_from_properties() to do this.

New in version 1.7.

streamsx.kafka.configure_connection(instance, name, bootstrap_servers, ssl_protocol=None, enable_hostname_verification=True)

Configures IBM Streams for a connection with a Kafka broker.

Creates an application configuration object containing the required properties with connection information. The application configuration contains following properties:

  • bootstrap.servers

  • security.protocol (when ssl_protocol is not None)

  • ssl.protocol (when ssl_protocol is not None)

  • ssl.endpoint.identification.algorithm (when enable_hostname_verification is False)

Example for creating a configuration for a Streams instance with connection details:

from streamsx.rest import Instance
import streamsx.topology.context
from icpd_core import icpd_util

cfg = icpd_util.get_service_instance_details(name='your-streams-instance')
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service(cfg)
bootstrap_servers = 'kafka-server-1.domain:9093,kafka-server-2.domain:9093,kafka-server-3.domain:9093'
app_cfg_name = configure_connection(instance, 'my_app_cfg1', bootstrap_servers, 'TLSv1.2')
Parameters
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.

  • name (str) – Name of the application configuration.

  • bootstrap_servers (str) – Comma separated List of hostname:TCPport of the Kafka-bootstrap-servers, for example 'server1:9093', or 'server1:9093,server2:9093,server3:9093'.

  • ssl_protocol (str) – One of None, ‘TLS’, ‘TLSv1’, ‘TLSv1.1’, or ‘TLSv1.2’. If None is used, TLS is not configured. If unsure, use ‘TLS’, which is Kafka’s default.

  • enable_hostname_verification (bool) – True (default) enables hostname verification of the server certificate, False disables hostname verification. The parameter is ignored, when ssl_protocol is None.

Returns

Name of the application configuration, i.e. the same value as given in the name parameter

Return type

str

Warning

The function can be used only in IBM Cloud Pak for Data

New in version 1.1.

streamsx.kafka.configure_connection_from_properties(instance, name, properties, description=None)

Configures IBM Streams for a connection with a Kafka broker.

Creates an application configuration object containing the required properties with connection information. The application configuration contains the properties given as key-value pairs in the properties dictionary. The keys must be valid consumer or producer configurations.

Example for creating a configuration for a Streams instance with connection details:

from streamsx.rest import Instance
import streamsx.topology.context
from streamsx.kafka import create_connection_properties, configure_connection
from icpd_core import icpd_util

cfg = icpd_util.get_service_instance_details(name='your-streams-instance')
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service(cfg)
bootstrap_servers = 'kafka-server-1.domain:9093,kafka-server-2.domain:9093,kafka-server-3.domain:9093'
consumer_properties = create_connection_properties (bootstrap_servers = bootstrap_servers)
app_cfg_name = configure_connection_from_properties(instance, 'my_app_cfg1', consumer_properties, 'KafkaConsumerConfig')
Parameters
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.

  • name (str) – Name of the application configuration.

  • properties (dict) – Properties containing valid consumer or producer configurations.

  • description (str) – Description of the application configuration. If no descrition is given, a description is generated.

Returns

Name of the application configuration, i.e. the same value as given in the name parameter

Return type

str

Warning

The function can be used only in IBM Cloud Pak for Data

New in version 1.3.

streamsx.kafka.publish(stream, topic, kafka_properties, name=None)

Publish messages to a topic in a Kafka broker.

Adds a Kafka producer where each tuple on stream is published as a stream message.

Parameters
  • stream (Stream) – Stream of tuples to be published as messages.

  • topic (str) – Topic to publish messages to.

  • kafka_properties (dict|str) – Properties containing the producer configurations, at minimum the bootstrap.servers property. When a string is given, it is the name of the application configuration, which contains producer configs. Must not be None.

  • name (str) – Producer name in the Streams context, defaults to a generated name.

Returns

Stream termination.

Return type

streamsx.topology.topology.Sink

Deprecated since version 1.8.0: Use the KafkaProducer to terminate a stream by publishing messages to Kafka.

streamsx.kafka.subscribe(topology, topic, kafka_properties, schema, group=None, name=None)

Subscribe to messages from a Kafka broker for a single topic.

Adds a Kafka consumer that subscribes to a topic and converts each message to a stream tuple.

Parameters
  • topology (Topology) – Topology that will contain the stream of messages.

  • topic (str) – Topic to subscribe messages from.

  • kafka_properties (dict|str) – Properties containing the consumer configurations, at minimum the bootstrap.servers property. When a string is given, it is the name of the application configuration, which contains consumer configs. Must not be None.

  • schema (StreamSchema) – Schema for returned stream.

  • group (str) – Kafka consumer group identifier. When not specified it default to the job name with topic appended separated by an underscore, so that all subscribe invocations within the topology to the same topic form a Kafka consumer group.

  • name (str) – Consumer name in the Streams context, defaults to a generated name.

Returns

Stream containing messages.

Return type

streamsx.topology.topology.Stream

Deprecated since version 1.8.0: Use the KafkaConsumer to create a source stream from a Kafka subscription.

class streamsx.kafka.schema.Schema

Bases: object

Structured stream schemas for keyed messages for KafkaConsumer, and for streams that are published by KafkaProducer to Kafka topics.

The schemas

have the attributes message, and key. They vary in the type for the message attribute and can be used for KafkaConsumer and for the stream published with KafkaProducer.

The schemas

have the attributes message, key, topic, partition, offset, and messageTimestamp. They vary in the type for the message attribute and can be used for KafkaConsumer.

All schemas defined in this class are instances of streamsx.topology.schema.StreamSchema.

The following sample uses structured schemas for publishing messages with keys to a possibly partitioned topic in a Kafka broker. Then, it creates a consumer group that subscribes to the topic, and processes the received messages in parallel channels partitioned by the message key:

from streamsx.topology.topology import Topology
from streamsx.topology.context import submit, ContextTypes
from streamsx.topology.topology import Routing
from streamsx.topology.schema import StreamSchema
from streamsx.kafka.schema import Schema
from streamsx.kafka import KafkaConsumer, KafkaProducer

import random
import time
import json
from datetime import datetime


# Define a callable source for data that we push into Event Streams
class SensorReadingsSource(object):
    def __call__(self):
        # This is just an example of using generated data,
        # Here you could connect to db
        # generate data
        # connect to data set
        # open file
        i = 0
        # wait that the consumer is ready before we start creating data
        time.sleep(20.0)
        while(i < 100000):
            time.sleep(0.001)
            i = i + 1
            sensor_id = random.randint(1, 100)
            reading = {}
            reading["sensor_id"] = "sensor_" + str(sensor_id)
            reading["value"] = random.random() * 3000
            reading["ts"] = int(datetime.now().timestamp())
            yield reading


# parses the JSON in the message and adds the attributes to a tuple
def flat_message_json(tuple):
    messageAsDict = json.loads(tuple['message'])
    tuple.update(messageAsDict)
    return tuple


# calculate a hash code of a string in a consistent way
# needed for partitioned parallel streams
def string_hashcode(s):
    h = 0
    for c in s:
        h = (31 * h + ord(c)) & 0xFFFFFFFF
    return ((h + 0x80000000) & 0xFFFFFFFF) - 0x80000000


topology = Topology('KafkaGroupParallel')

#
# the producer part
#
# create the data and map them to the attributes 'message' and 'key' of the
# 'Schema.StringMessage' schema for Kafka, so that we have messages with keys
sensorStream = topology.source(
    SensorReadingsSource(),
    "RawDataSource"
    ).map(
        func=lambda reading: {'message': json.dumps(reading),
                              'key': reading['sensor_id']},
        name="ToKeyedMessage",
        schema=Schema.StringMessage)
# assume, we are running a Kafka broker at localhost:9092
producer = KafkaProducer(config={'bootstrap.servers': 'localhost:9092'}, topic="ThreePartitions")
sensorStream.for_each(producer)

#
# the consumer side
#
# subscribe, create a consumer group with 3 consumers

consumerSchema = Schema.StringMessageMeta
consumer = KafkaConsumer(config={'bootstrap.servers': 'localhost:9092'},
                         topic="ThreePartitions",
                         schema=consumerSchema)
consumer.group_id = "my_consumer_group"
consumer.group_size = 5

received = topology.source(consumer, name="SensorSubscribe").end_parallel()

# start a different parallel region partitioned by message key,
# so that each key always goes into the same parallel channel
receivedParallelPartitioned = received.parallel(
    5,
    routing=Routing.HASH_PARTITIONED,
    func=lambda x: string_hashcode(x['key']))

# schema extension, here we use the Python 2.7, 3 way
flattenedSchema = consumerSchema.extend(
    StreamSchema('tuple<rstring sensor_id, float64 value, int64 ts>'))

receivedParallelPartitionedFlattened = receivedParallelPartitioned.map(
    func=flat_message_json,
    name='JSON2Attributes',
    schema=flattenedSchema)

# validate by remove negativ and zero values from the streams,
# pass only positive vaues and timestamps
receivedValidated = receivedParallelPartitionedFlattened.filter(
    lambda tup: (tup['value'] > 0) and (tup['ts'] > 0),
    name='Validate')

# end parallel processing and print the merged result stream to stdout log
receivedValidated.end_parallel().print()

submit(ContextTypes.DISTRIBUTED, topology)
BinaryMessage = <streamsx.topology.schema.StreamSchema object>

Stream schema with message and key, where the message is a binary object (sequence of bytes), and the key is a string.

The schema defines following attributes

  • message(bytes) - the message content

  • key(str) - the key for partitioning

This schema can be used for both KafkaConsumer and for the stream published with KafkaProducer.

New in version 1.2.

BinaryMessageMeta = <streamsx.topology.schema.StreamSchema object>

Stream schema with message, key, and message meta data, where the message is a binary object (sequence of bytes), and the key is a string.

The schema defines following attributes

  • message(bytes) - the message content

  • key(str) - the key for partitioning

  • topic(str) - the Event Streams topic

  • partition(int) - the topic partition number (32 bit)

  • offset(int) - the offset of the message within the topic partition (64 bit)

  • messageTimestamp(int) - the message timestamp in milliseconds since epoch (64 bit)

This schema can be used for KafkaConsumer.

New in version 1.2.

StringMessage = <streamsx.topology.schema.StreamSchema object>

Stream schema with message and key, both being strings.

The schema defines following attributes

  • message(str) - the message content

  • key(str) - the key for partitioning

This schema can be used for both KafkaConsumer and for the stream published with KafkaProducer.

New in version 1.2.

StringMessageMeta = <streamsx.topology.schema.StreamSchema object>

Stream schema with message, key, and message meta data, where both message and key are strings.

The schema defines following attributes

  • message(str) - the message content

  • key(str) - the key for partitioning

  • topic(str) - the Event Streams topic

  • partition(int) - the topic partition number (32 bit)

  • offset(int) - the offset of the message within the topic partition (64 bit)

  • messageTimestamp(int) - the message timestamp in milliseconds since epoch (64 bit)

This schema can be used for KafkaConsumer.

New in version 1.2.

Indices and tables