IBM Streams Kafka integration¶
Overview¶
This module allows a Streams application to subscribe
a Kafka topic
as a stream and publish
messages on a Kafka topic from a stream
of tuples.
Connection to a Kafka broker¶
To bootstrap servers of the Kafka broker can be defined using a Streams application configuration or
within the Python code by using a dictionary variable.
The name of the application configuration or the dictionary must be specified using the kafka_properties
parameter to subscribe()
or publish()
.
The minimum set of properties in the application configuration or dictionary contains bootstrap.servers
, for example
config | value |
---|---|
bootstrap.servers | host1:port1,host2:port2,host3:port3 |
Other configs for Kafka consumers or Kafka producers can be added to the application configuration or dictionary.
When configurations are specified, which are specific for consumers or producers only, it is recommended
to use different application configurations or variables of dict type for publish
and subscribe
.
The consumer and producer configs can be found in the Kafka documentation.
Please note, that the underlying SPL toolkit already adjusts several configurations. Please contact the toolkit operator reference.
Connection parameter example:
import streamsx.kafka as kafka
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
consumerProperties = {}
consumerProperties['bootstrap.servers'] = 'kafka-host1.domain:9092,kafka-host2.domain:9092'
consumerProperties['fetch.min.bytes'] = '1024'
consumerProperties['max.partition.fetch.bytes'] = '4194304'
topology = Topology()
kafka.subscribe(topology, 'Your_Topic', consumerProperties, CommonSchema.String)
Messages¶
The schema of the stream defines how messages are handled.
CommonSchema.String
- Each message is a UTF-8 encoded string.CommonSchema.Json
- Each message is a UTF-8 encoded serialized JSON object.StringMessage
- structured schema with message and keyBinaryMessage
- structured schema with message and keyStringMessageMeta
- structured schema with message, key, and message meta dataBinaryMessageMeta
- structured schema with message, key, and message meta data
No other formats are supported.
Sample¶
A simple hello world example of a Streams application publishing to a topic and the same application consuming the same topic:
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
import streamsx.kafka as kafka
import time
def delay(v):
time.sleep(5.0)
return True
topology = Topology('KafkaHelloWorld')
to_kafka = topology.source(['Hello', 'World!'])
to_kafka = to_kafka.as_string()
# delay tuple by tuple
to_kafka = to_kafka.filter(delay)
# Publish a stream to Kafka using TEST topic, the Kafka servers
# (bootstrap.servers) are configured in the application configuration 'kafka_props'.
kafka.publish(to_kafka, 'TEST', 'kafka_props')
# Subscribe to same topic as a stream
from_kafka = kafka.subscribe(topology, 'TEST', 'kafka_props', CommonSchema.String)
# You'll find the Hello World! in stdout log file:
from_kafka.print()
submit(ContextTypes.DISTRIBUTED, topology)
# The Streams job is kept running.
-
streamsx.kafka.
subscribe
(topology, topic, kafka_properties, schema, group=None, name=None)¶ Subscribe to messages from a Kafka broker for a single topic.
Adds a Kafka consumer that subscribes to a topic and converts each message to a stream tuple.
Parameters: - topology (Topology) – Topology that will contain the stream of messages.
- topic (str) – Topic to subscribe messages from.
- kafka_properties (dict|str) – Properties containing the consumer configurations, at minimum the
bootstrap.servers
property. When a string is given, it is the name of the application configuration, which contains consumer configs. Must not be set toNone
. - schema (StreamSchema) – Schema for returned stream.
- group (str) – Kafka consumer group identifier. When not specified it default to the job name with topic appended separated by an underscore.
- name (str) – Consumer name in the Streams context, defaults to a generated name.
Returns: Stream containing messages.
Return type: Stream
-
streamsx.kafka.
publish
(stream, topic, kafka_properties, name=None)¶ Publish messages to a topic in a Kafka broker.
Adds a Kafka producer where each tuple on stream is published as a stream message.
Parameters: - stream (Stream) – Stream of tuples to be published as messages.
- topic (str) – Topic to publish messages to.
- kafka_properties (dict|str) – Properties containing the producer configurations, at minimum the
bootstrap.servers
property. When a string is given, it is the name of the application configuration, which contains producer configs. Must not be set toNone
. - name (str) – Producer name in the Streams context, defaults to a generated name.
Returns: Stream termination.
Return type: streamsx.topology.topology.Sink
-
streamsx.kafka.
configure_connection
(instance, name, bootstrap_servers, ssl_protocol=None)¶ Configures IBM Streams for a connection with a Kafka broker.
Creates an application configuration object containing the required properties with connection information.
Example for creating a configuration for a Streams instance with connection details:
streamsx.rest import Instance import streamsx.topology.context from icpd_core import icpd_util cfg = icpd_util.get_service_instance_details(name='your-streams-instance') cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False instance = Instance.of_service(cfg) bootstrap_servers = 'kafka-server-1.domain:9093,kafka-server-2.domain:9093,kafka-server-3.domain:9093' app_cfg_name = configure_connection(instance, 'my_app_cfg1', bootstrap_servers, 'TLSv1.2')
Parameters: - instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
- name (str) – Name of the application configuration.
- bootstrap_servers (str) – Comma separated List of hostname:TCPport of the Kafka-servers
- ssl_protocol (str) – One of None, ‘TLS’, ‘TLSv1’, ‘TLSv1.1’, or ‘TLSv1.2’. If None is used, TLS is not configured.
Returns: Name of the application configuration, i.e. the same value as given in the name parameter
Warning
The function can be used only in IBM Cloud Pak for Data
New in version 1.1.
Schemas for streams created with the subscribe()
method, and usable for
streams terminated with the publish()
. All of these message types are keyed messages.
-
class
streamsx.kafka.schema.
Schema
¶ Structured stream schemas for keyed messages for
subscribe()
, and for streams that are published bypublish()
to an Event Streams topic.The schemas
have the attributes
message
, andkey
. They vary in the type for themessage
attribute and can be used forsubscribe()
and for the stream published withpublish()
.The schemas
have the attributes
message
,key
,topic
,partition
,offset
, andmessageTimestamp
. They vary in the type for themessage
attribute and can be used forsubscribe()
andpublish()
.All schemas defined in this class are instances of streamsx.topology.schema.StreamSchema.
The following sample uses structured schemas for publishing messages with keys to a possibly partitioned topic in a Kafka broker. Then, it creates a consumer group that subscribes to the topic, and processes the received messages in parallel channels partitioned by the message key:
from streamsx.topology.topology import Topology from streamsx.topology.context import submit, ContextTypes from streamsx.topology.topology import Routing from streamsx.topology.schema import StreamSchema from streamsx.kafka.schema import Schema import streamsx.kafka as kafka import random import time import json from datetime import datetime # Define a callable source for data that we push into Event Streams class SensorReadingsSource(object): def __call__(self): # This is just an example of using generated data, # Here you could connect to db # generate data # connect to data set # open file i = 0 # wait that the consumer is ready before we start creating data time.sleep(20.0) while(i < 100000): time.sleep(0.001) i = i + 1 sensor_id = random.randint(1, 100) reading = {} reading["sensor_id"] = "sensor_" + str(sensor_id) reading["value"] = random.random() * 3000 reading["ts"] = int(datetime.now().timestamp()) yield reading # parses the JSON in the message and adds the attributes to a tuple def flat_message_json(tuple): messageAsDict = json.loads(tuple['message']) tuple.update(messageAsDict) return tuple # calculate a hash code of a string in a consistent way # needed for partitioned parallel streams def string_hashcode(s): h = 0 for c in s: h = (31 * h + ord(c)) & 0xFFFFFFFF return ((h + 0x80000000) & 0xFFFFFFFF) - 0x80000000 topology = Topology('KafkaGroupParallel') # # the producer part # # create the data and map them to the attributes 'message' and 'key' of the # 'Schema.StringMessage' schema for Kafka, so that we have messages with keys sensorStream = topology.source( SensorReadingsSource(), "RawDataSource" ).map( func=lambda reading: {'message': json.dumps(reading), 'key': reading['sensor_id']}, name="ToKeyedMessage", schema=Schema.StringMessage) # assume, we are running a Kafka broker at localhost:9092 producer_configs = dict() producer_configs['bootstrap.servers'] = 'localhost:9092' kafkaSink = kafka.publish( sensorStream, topic="ThreePartitions", kafka_properties=producer_configs, name="SensorPublish") # # the consumer side # # subscribe, create a consumer group with 3 consumers consumer_configs = dict() consumer_configs['bootstrap.servers'] = 'localhost:9092' consumerSchema = Schema.StringMessageMeta received = kafka.subscribe( topology, topic="ThreePartitions", schema=consumerSchema, group='my_consumer_group', kafka_properties=consumer_configs, name="SensorSubscribe" ).set_parallel(3).end_parallel() # start a different parallel region partitioned by message key, # so that each key always goes into the same parallel channel receivedParallelPartitioned = received.parallel( 5, routing=Routing.HASH_PARTITIONED, func=lambda x: string_hashcode(x['key'])) # schema extension, here we use the Python 2.7, 3 way flattenedSchema = consumerSchema.extend( StreamSchema('tuple<rstring sensor_id, float64 value, int64 ts>')) receivedParallelPartitionedFlattened = receivedParallelPartitioned.map( func=flat_message_json, name='JSON2Attributes', schema=flattenedSchema) # validate by remove negativ and zero values from the streams, # pass only positive vaues and timestamps receivedValidated = receivedParallelPartitionedFlattened.filter( lambda tup: (tup['value'] > 0) and (tup['ts'] > 0), name='Validate') # end parallel processing and print the merged result stream to stdout log receivedValidated.end_parallel().print() submit(ContextTypes.DISTRIBUTED, topology)
-
BinaryMessage
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message and key, where the message is a binary object (sequence of bytes), and the key is a string.
The schema defines following attributes
- message(bytes) - the message content
- key(str) - the key for partitioning
This schema can be used for both
subscribe()
, and for streams that are published bypublish()
.New in version 1.2.
-
BinaryMessageMeta
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message, key, and message meta data, where the message is a binary object (sequence of bytes), and the key is a string. This schema can be used for
subscribe()
.The schema defines following attributes
- message(bytes) - the message content
- key(str) - the key for partitioning
- topic(str) - the Event Streams topic
- partition(int) - the topic partition number (32 bit)
- offset(int) - the offset of the message within the topic partition (64 bit)
- messageTimestamp(int) - the message timestamp in milliseconds since epoch (64 bit)
New in version 1.2.
-
StringMessage
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message and key, both being strings.
The schema defines following attributes
- message(str) - the message content
- key(str) - the key for partitioning
This schema can be used for both
subscribe()
, and for streams that are published bypublish()
.New in version 1.2.
-
StringMessageMeta
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message, key, and message meta data, where both message and key are strings. This schema can be used for
subscribe()
.The schema defines following attributes
- message(str) - the message content
- key(str) - the key for partitioning
- topic(str) - the Event Streams topic
- partition(int) - the topic partition number (32 bit)
- offset(int) - the offset of the message within the topic partition (64 bit)
- messageTimestamp(int) - the message timestamp in milliseconds since epoch (64 bit)
New in version 1.2.
-