Striim 3.9.4 / 3.9.5 documentation

Persisting a stream to Kafka

For an overview of this feature, see Introducing Kafka streams.

Note

Kafka and Zookeeper must be running when you create a Kafka-persisted stream, persist an existing stream, or import an application containing one.

CREATE STREAM <name> OF <type> PERSIST [USING <property set>]

To enable replay of a stream by persisting it to Kafka (see Replaying events using Kafka streams), use the syntax CREATE STREAM <name> OF <type> USING <namespace>.<property set>, where <property set> is the name of a set of Kafka server properties. To persist to a Striim cluster's integrated Kafka broker, use the property set Global.DefaultKafkaProperties, for example:

CREATE STREAM MyStream of MyStreamType PERSIST USING Global.DefaultKafkaProperties;

To persist to an external Kafka broker, instead of Global.DefaultKafkaProperties specify a custom property set created as described in Configuring Kafka.

This memory-resident stream may be used in the usual way in a window or CQ. Alternatively, the persisted data may be read by KafkaReader using topic name <namespace>_<stream name> (see Reading a Kafka stream with KafkaReader). To use persisted stream data from the integrated Kafka broker outside of Striim, see Reading a Kafka stream with an external Kafka consumer.

Limitations:

  • Kafka streams may be used only on the output of a source or the output of a CQ that parses a source.

  • Implicit streams may not be persisted to Kafka.

  • In an application or flow running in a Forwarding Agent, a source or CQ may output to a Kafka stream, but any further processing of that stream must take place on the Striim server.

  • If the Kafka broker configuration delete.topic.enable is false (the default for Kafka 0.11 and all other releases prior to 1.0.0), when you drop a Striim application with a Kafka stream after a crash, when you reload the application creating the stream will fail. To avoid this, set delete.topic.enable=true.

Thus the Kafka stream must be explicitly created before the source or CQ that populates it. Using MultiLogApp for example, to persist the raw output of the access log source:

CREATE STREAM RawAccessStream OF Global.WAEvent
  PERSIST USING Global.DefaultKafkaProperties;

CREATE SOURCE AccessLogSource USING FileReader (
  directory:'Samples/MultiLogApp/appData',
  wildcard:'access_log',
  positionByEOF:false
)
PARSE USING DSVParser (
  ignoreemptycolumn:'Yes',
  quoteset:'[]~"',
  separator:'~'
)
OUTPUT TO RawAccessStream;

Alternatively, to persist the output of the CQ that parses that raw output:

CREATE TYPE AccessLogEntry (
    srcIp String KEY ...
);
CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties;

CREATE CQ ParseAccessLog 
INSERT INTO AccessStream
SELECT data[0] ...
FROM RawAccessStream;

To distribute events among multiple Kafka partitions, use PARTITION BY <field>:

CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties
  PARTITION BY srcIp;

All events with the same value in <field> will be written to the same randomly selected Kafka partition. Striim will distribute the data evenly among the partitions to the extent allowed by the frequency of the <field> values. For example, if 80% of the events have the same <field> value, then one of the Kafka partitions will contain 80% of the events.

By default, events may be distributed among up to 200 Kafka partitions. See Configuring Kafka for more information.

Dropping a persisted stream will automatically delete the associated Kafka topics.

If recovery (see Recovering applications) is enabled for an application containing a Kafka stream, the persisted data will include "CheckPoint" events used by the recovery process.