IBM Streams Kafka integration

Overview

This module allows a Streams application to subscribe a Kafka topic as a stream and publish messages on a Kafka topic from a stream of tuples.

Connection to a Kafka broker

To bootstrap servers of the Kafka broker can be defined using a Streams application configuration or within the Python code by using a dictionary variable. The name of the application configuration or the dictionary must be specified using the kafka_properties parameter to subscribe() or publish(). The minimum set of properties in the application configuration or dictionary contains bootstrap.servers, for example

config value
bootstrap.servers host1:port1,host2:port2,host3:port3

Other configs for Kafka consumers or Kafka producers can be added to the application configuration. When configurations are specified, which are specific for consumers or producers only, it is recommended to use different application configurations for publish and subscribe.

Messages

The schema of the stream defines how messages are handled.

  • CommonSchema.String - Each message is a UTF-8 encoded string.
  • CommonSchema.Json - Each message is a UTF-8 encoded serialized JSON object.

No other formats are supported.

Sample

A simple hello world example of a Streams application publishing to a topic and the same application consuming the same topic:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
from streamsx.topology.context import ContextTypes
import streamsx.kafka as kafka
import time

def delay (v):
time.sleep (5.0)
return True

topo = Topology('KafkaHelloWorld')

to_kafka = topo.source (['Hello', 'World!'])
to_kafka = to_kafka.as_string()
# delay tuple by tuple
to_kafka = to_kafka.filter (delay)

# Publish a stream to Kafka using TEST topic, the Kafka servers
# (bootstrap.servers) are configured in the application configuration 'kafka_props'.
kafka.publish (to_kafka, 'TEST', 'kafka_props')

# Subscribe to same topic as a stream
from_kafka = kafka.subscribe (topo, 'TEST', 'kafka_props', CommonSchema.String)

# You'll find the Hello World! in stdout log file:
from_kafka.print()

submit (ContextTypes.DISTRIBUTED, topo)
# The Streams job is kept running.
streamsx.kafka.subscribe(topology, topic, kafka_properties, schema, group=None, name=None)

Subscribe to messages from a Kafka broker for a single topic.

Adds a Kafka consumer that subscribes to a topic and converts each message to a stream tuple.

Parameters:
  • topology (Topology) – Topology that will contain the stream of messages.
  • topic (str) – Topic to subscribe messages from.
  • kafka_properties (str|dict) – Properties containing the consumer configurations, at minimum the bootstrap.servers property. When a string is given, it is the name of the application, which contains consumer configs. Must not be set to None.
  • schema (StreamSchema) – Schema for returned stream.
  • group (str) – Kafka consumer group identifier. When not specified it default to the job name with topic appended separated by an underscore.
  • name (str) – Consumer name in the Streams context, defaults to a generated name.
Returns:

Stream containing messages.

Return type:

Stream

streamsx.kafka.publish(stream, topic, kafka_properties, name=None)

Publish messages to a topic in a Kafka broker.

Adds a Kafka producer where each tuple on stream is published as a stream message.

Parameters:
  • stream (Stream) – Stream of tuples to published as messages.
  • topic (str) – Topic to publish messages to.
  • kafka_properties (str|dict) – Properties containing the producer configurations, at minimum the bootstrap.servers property. When a string is given, it is the name of the application, which contains consumer configs. Must not be set to None.
  • name (str) – Producer name in the Streams context, defaults to a generated name.
Returns:

Stream termination.

Return type:

streamsx.topology.topology.Sink

Indices and tables