Striim 3.9.4 / 3.9.5 documentation

Using the Striim Forwarding Agent

The Striim Forwarding Agent is a stripped-down version of a Striim server that can be used to run sources and CQs locally on a remote host. Windows, caches, and other components are not supported.

To use the Agent, first follow the instructions in Installing the Striim Forwarding Agent. Then, in your application, create a flow for the source that will run on the agent, and deploy it to the agent's deployment group.

Here is a simple example that reads from a file on the remote host and writes to a file on the Striim server:

CREATE APPLICATION agentTest;

CREATE FLOW AgentFlow;
CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;
END FLOW AgentFlow;
  
CREATE FLOW ServerFlow;
CREATE TARGET t USING FileWriter(  filename:'AgentOut')
FORMAT USING DSVFormatter ()
INPUT FROM CsvStream
END FLOW ServerFlow;

END APPLICATION agentTest;

DEPLOY APPLICATION agentTest ON default
WITH AgentFlow ON ALL IN agent;

Be sure the Agent is running when you load the application. If there are multiple agents in the deployment group the source will automatically combine their data.

Note

CQs running on an Agent may not select from Kafka streams or include AS <field name>, GROUP BY, HAVING, ITERATOR, or MATCH_PATTERN.

The following variation on the beginning of the PosApp sample application filters out unneeded columns and partitions the stream (see Adapting TQL applications for multi-server deployment):

CREATE FLOW AgentFlow; 
 CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream partition by merchantId
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

END FLOW AgentFlow; ...

PARTITION BY merchantId specifies that events with the same merchantId values will be processed on the same server. This is required for ServerFlow to be deployed to multiple servers. An unpartitioned source is automatically deployed to a single server.

See Filtering events using OUTPUT TO and WHERE for additional examples of filter syntax that are compatible with the Forwarding Agent.