Striim 3.9.4 / 3.9.5 documentation

KafkaWriter

Writes to a topic in Apache Kafka.

There are five versions of KafkaWriter, 0.8.0,  0.9.0, 0.10.0, 0.11.0, and 2.1.0. Use the one that corresponds to the target Kafka broker. For example, to use 0.9.0, the syntax is CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. If writing to the internal Kafka instance, use 0.11.0.

Known issue DEV-13039: application with KafkaWriter 0.9 or 0.10 crashes if Kafka broker goes offline.

property

type

default value

notes

brokerAddress

java.lang.String

KafkaConfig

java.lang.String

Optionally, specify Kafka producer properties, separated by semicolons. See the table below for details.

KafkaConfigPropertySeparator

java.lang.String

;

Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

KafkaConfigValueSeparator

java.lang.String

=

Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

mode

java.lang.String

Sync

see Setting KafkaWriter's mode property: sync versus async

ParallelThreads

java.lang.Integer

see Creating multiple writer instances

PartitionKey

java.lang.String

The name of a field in the input streamwhose values determine how events are distributed among multiple partitions. Events with the same partition key field value will be written to the same partition.

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fieldsMySQLReader WAEvent fieldsOracleReader WAEvent fields, or MSSQLReader WAEvent fields) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>).

Topic

java.lang.String

the existing Kafka topic to write to (will not be created if it does not exist)

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Notes on the KafkaConfig property

With the exceptions noted in the following table, you may specify any Kafka producer property in KafkaConfig. 

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

retries

  • in sync mode, to prevent out-of-order events, the producer properties set in Kafka with will be unchanged and ignored, and Striim will handle these internally

  • in async mode, Striim will update the Kafka producer properties and these will be handled by Kafka

enable.idempotence

When using version 2.1.0 and async mode, set to true to write events in order (see

key.deserializer

value.deserializer

value is always org.apache.kafka.common.serialization.ByteArrayDeserializer, cannot be overridden by KafkaConfig

Internally, KafkaWriter invokes KafkaConsumer for various purposes, and the WARNING from the consumer API due to passing KafkaConfig  properties can be safely ignored. See Configuring Kafka for more information about Kafka producer properties.

KafkaWriter sample application

The following sample code writes data from PosDataPreview.csv to the Kafka topic KafkaWriterSample. This topic already exists in Striim's internal Kafka instance. If you are using an external Kafka instance, you must create the topic before running the application.

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'(
  brokeraddress:'localhost:9092',
  topic:'KafkaWriterSample'
)
FORMAT USING DSVFormatter ()
INPUT FROM PosDataStream;

You can verify that data was written to Kafka by running the KafkaReader sample application.

The first field in the output (position) stores information required to avoid lost or duplicate events after recovery (see Recovering applications). If recovery is not enabled, its value is NULL.

mon output (see Using the MON command) for targets using KafkaWriter includes:

  • in async mode only, Sent Bytes Rate: how many megabytes per second were sent to the brokers

  • in both sync and async mode, Write Bytes Rate: how many megabytes per second were written by the brokers and acknowledgement received by Striim

Enabling compression

When you enable compression in KafkaWriter, the broker and consumer should handle the compressed batches automatically. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'