Striim 3.9.4 / 3.9.5 documentation

Using Apache Flume

Striim can receive data from Apache Flume using the WebActionSink (see Apache Flume integration) as a source. Its properties are defined in configuration files on the Flume server rather than in TQL.

The WebActionSink properties are:







watp:// <ip_address > :9080

the IP address and port of the Striim server (adjust the port number if you are not using the default)


the Striim login to be used by the WebActionSink


the password for that login

flume:<stream name>

specify the stream name to be used in TQL (see example below)



in this release, only DSVParser is supported

You must also specify the properties for the specified parser. See the example below.

The following example application assumes that Flume is running on the same system as Striim.

1. Perform the first two steps described in Apache Flume integration.

2. Save the following as a TQL file, then load, deploy, and start it:

CREATE STREAM flumeStream of Global.WAEvent;
CREATE TARGET flumeOut USING SysOut(name:flumeTest) INPUT FROM flumeStream;

This application does not need a CREATE SOURCE statement because the data is being collected and parsed by Flume. The stream name must match the one specified in the WebActionSink properties and the type must be Global.WAEvent.

2. Save the following as waflume.conf in the flume/conf directory, replacing the two IP addresses with the test system's IP address and the username and password with the credentials you used to load the application:

# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = netcatSrc
agent.channels = memoryChannel
agent.sinks = webactionSink

# For each one of the sources, the type is defined
agent.sources.netcatSrc.type = netcat
agent.sources.netcatSrc.bind =
agent.sources.netcatSrc.port = 41414

# The channel can be defined as follows.
agent.sources.netcatSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.webactionSink.type = com.webaction.flume.WebActionSink
agent.sinks.webactionSink.serverUri = watp://
agent.sinks.webactionSink.username = flumeusr
agent.sinks.webactionSink.password = passwd = flume:flumeStream
agent.sinks.webactionSink.parser.handler = DSVParser
agent.sinks.webactionSink.parser.blocksize = 256
agent.sinks.webactionSink.parser.columndelimiter = ","
agent.sinks.webactionSink.parser.rowdelimiter = "\n" 
agent.sinks.webactionSink.parser.charset = "UTF-8"
agent.sinks.webactionSink.parser.blockAsCompleteRecord = "True"

#Specify the channel the sink should use = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

3. Start Flume, specifying the configuration file:

bin/flume-ng agent --conf conf --conf-file conf/waflume.conf --name agent -

4. Save the following as flumetestdata.csv:


5. Open a terminal, change to the directory where you saved flumetestdata.csv, and enter the following command, replacing the IP address with the test system's:

cat flumetestdata.csv | nc 41414

The following output should appear in striim-node.log (see Reading log files):

flumeTest: WAEvent{
  data: ["ID","Name"]
  metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0}
  before: null
  dataPresenceBitMap: "AA=="
  beforePresenceBitMap: "AA=="
  typeUUID: null
flumeTest: WAEvent{
  data: ["100","first"]
  metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0}
  before: null
  dataPresenceBitMap: "AA=="
  beforePresenceBitMap: "AA=="
  typeUUID: null

See Parsing the data field of WAEvent for more information about this data format.