Striim 3.9.4 / 3.9.5 documentation

Creating an open processor component

A Striim open processor contains a custom Java application that reads data from a window or stream, processes it, optionally enriching it with data from a cache, and writes to an output stream.

The SDK includes the following:

  • StriimOpenProcessor-SDK.jar, which contains classes to be included in the Java application, installed with Striim at striim/docs/StriimOpenProcessor-SDKdocs.

  • A Javadoc API reference for methods you may use in your Java application, installed with Striim at striim/docs/StriimOpenProcessor-SDKdocs.

The component must be built with Maven, since it requires the Maven Shade Plugin.

An open processor can be used only in the Striim namespace from which the types are exported.

The following simple example shows all the steps required to create an open processor and use it in a Striim application.

Step 1: define the input and output streams in Striim

The following TQL defines the input and output streams for the example open processor you will add later. It includes a FileWriter source, a cache that will be specified in the open processor's ENRICH option, and a FileWriter target.

CREATE NAMESPACE ns1;
USE ns1;
CREATE APPLICATION OPExample;

CREATE source CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream;
 
CREATE TYPE MerchantHourlyAve(
  merchantId String,
  hourValue integer,
  hourlyAve integer
);

CREATE CACHE HourlyAveLookup using FileReader (
  directory: 'Samples/PosApp/appData',
  wildcard: 'hourlyData.txt'
)
PARSE USING DSVParser (
  header: Yes,
  trimquote:false,
  trimwhitespace:true
) 
QUERY (keytomap:'merchantId') 
OF MerchantHourlyAve;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream partition by merchantId
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_INT(data[9]) as zip
FROM CsvStream;
 
CREATE CQ cq2
INSERT INTO SendToOPStream
SELECT makeList(dateTime) as dateTime,
  makeList(zip) as zip
FROM PosDataStream;
 
CREATE TYPE ReturnFromOPStream_Type ( time DateTime , val Integer );
CREATE STREAM ReturnFromOPStream OF ReturnFromOPStream_Type;

CREATE TARGET OPExampleTarget 
USING FileWriter (filename: 'OPExampleOut') 
FORMAT USING JSONFormatter() 
INPUT FROM ReturnFromOPStream;
 
END APPLICATION OPExample;
Step 2: export the input and output stream types

If you create OPExample in the ns1 workspace, the following Striim console command will export the types from the application to /home/myhome/OpExampleTypes.jar:

EXPORT TYPES OF ns1.OPExample TO "/home/myhome/OpExampleTypes.jar";

The EXPORT TYPES command requires read permission on the namespace. If you do not specify a path, the file will be created in the striim directory.

Step 3: set up Maven

Install the SDK and exported types .jar files:

mvn install:install-file -DgroupId=com.example -DartifactId=OpenProcessorSDK \
  -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar -Dfile=/opt/striim/StriimSDK/StriimOpenProcessor-SDK.jar
mvn install:install-file -DgroupId=com.example -DartifactId=OPExample -Dversion=1.0.0-SNAPSHOT \
  -Dpackaging=jar -Dfile=/home/myhome/OpExampleTypes.jar

Create a Maven project in which you will create your custom Java application:

mvn archetype:generate -DgroupId=com.example.opexample -DartifactId=opexample \
  -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Replace the default pom.xml created by Maven with the following, adjusting as necessary for your environment:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.opexample</groupId>
  <artifactId>opexample</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>opexample</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
<!-- OpenProcessorSDK jar -->
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>OpenProcessorSDK</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
<!-- exported types jar -->
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>OPExample</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
  </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-release-plugin</artifactId>
                <version>2.2.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
<!-- 
The output SCM filename is defined here.
-->
                    <finalName>OPExample.scm</finalName>
                    <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <manifestEntries>
                                <Striim-Module-Name>OPExample</Striim-Module-Name>
                                <Striim-Service-Interface>
                                  com.webaction.runtime.components.openprocessor.StriimOpenProcessor
                                </Striim-Service-Interface>
                                <Striim-Service-Implementation>
                                  com.example.opexample.App
                                </Striim-Service-Implementation>
                            </manifestEntries>
                        </transformer>
                    </transformers>
                    <artifactSet>
                        <excludes>
                            <exclude>org.slf4j:*</exclude>
                            <exlcude>log4j:*</exlcude>
                        </excludes>
                    </artifactSet>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>com.coderplus.maven.plugins</groupId>
                <artifactId>copy-rename-maven-plugin</artifactId>
                <version>1.0</version>
                <executions>
                    <execution>
                        <id>copy-file</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy</goal>
                        </goals>
<!--
The location and name for the .scm file to be imported into Striim is defined here.
Preferred location is module/modules folder under the Maven project main folder.
-->
                        <configuration>
    <sourceFile>/home/myhonme/opexample/target/OpExample.scm.jar</sourceFile>
    <destinationFile>/home/myhome/Documents/opexample/modules/OpExample.scm</destinationFile>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
Step 4: write your Java application and build the .scm

Replace the default App.java with the following:

package com.example.opexample.app;
 
import wa.ns1.SendToOPStream_Type_1_0;
import wa.ns1.ReturnFromOPStream_Type_1_0;
 
import com.webaction.anno.PropertyTemplateProperty;
import com.webaction.runtime.components.openprocessor.StriimOpenProcessor;
import org.joda.time.DateTime;
 
import com.webaction.anno.AdapterType;
import com.webaction.anno.PropertyTemplate;
import com.webaction.runtime.containers.WAEvent;
 
import com.webaction.runtime.containers.IBatch;
import java.util.*;
 
@PropertyTemplate(name = "TupleConverter", type = AdapterType.process,
properties = {
@PropertyTemplateProperty(name="ahead", type=Integer.class, required=true, defaultValue="0"),
@PropertyTemplateProperty(name="lastItemSeen", type=Boolean.class, required=true, defaultValue="0")
},
// The names of outputType and inputType are relative to Striim: output from a native Striim
// code to your custom component, and input from your custom component to a native component.
outputType = SendToOPStream_Type_1_0.class,
inputType = ReturnFromOPStream_Type_1_0.class
)
public class App extends StriimOpenProcessor
{
  
public void run() {
		IBatch<WAEvent> event = getAdded();
		Iterator<WAEvent> it = event.iterator();
		while (it.hasNext()) {
			SendToOPStream_Type_1_0  type = (SendToOPStream_Type_1_0 ) it.next().data;
			//  ... Additional operations
		}

		ReturnFromOPStream_Type_1_0 ReturnFromOPStream_Type_1_0  = new ReturnFromOPStream_Type_1_0 ();
		ReturnFromOPStream_Type_1_0.time = DateTime.now();
		Random rand = new Random(System.currentTimeMillis());

		ReturnFromOPStream_Type_1_0.val= rand.nextInt(50) + 1;
		send(ReturnFromOPStream_Type_1_0 );

	}

public void close() throws Exception {
        // TODO Auto-generated method stub
 
    }
 
    public Map getAggVec() {
        // TODO Auto-generated method stub
        return null;
    }
 
    public void setAggVec(Map aggVec) {
        // TODO Auto-generated method stub
 
    }
}

Change to the opexample directory created by Maven and enter mvn package.

Step 5: import the .scm into Striim

Copy opexample/modules/OpExample.scm to a directory accessible by the Striim server, then use the following console command to load it:

LOAD OPEN PROCESSOR "<path>/OpExample.scm";

Alternatively, you may load it in Flow Designer at Configuration > App settings > Load / unload open processor.

Step 6: add the open processor to your application

Return to the application you created in step 1, open the Base Components section of the component palette, drag a Striim Open Processor into the workspace, set its options as follows, and click Save. Note that Ahead and Last Item Seen are defined by the Java class. The other properties will appear in all open processor components.

ExampleOP_settings.png

If you run the application, it will create output files in the striim directory.