Striim 3.9.4 / 3.9.5 documentation

AzureEventHubWriter

Writes to an existing Azure event hub, which is equivalent to a Kafka topic.

Azure Event Hubs is similar to Kafka, compatible with many Kafka tools, and uses some of the same architectural elements, such as consumer groups and partitions. AzureEventHubWriter is generally similar to KafkaWriter in sync mode and its output formats are the same.

This release uses version 2.2.0 of the azure-event-hubs-java API.

property

type

default value

notes

Batch Policy

java.lang.String

Size:1000000, Interval:30s

The batch policy may include size or interval. Cached data is written to the target every time either of the specified values is exceeded. With the default setting, data will be written every 30 seconds or sooner if the cache contains 256,000 bytes. When the application is stopped any remaining data in the buffer is discarded.

Connection Retry

java.lang.String

Retries:0, RetryBackOff:1m

With the default Retries:0, retry is disabled. To enable retries, set a positive value for Retries and in RetryBackOff specify the interval between retries in minutes (#m) or seconds (#s) . For example, with the setting Retries:3, RetryBackOff:30s, if the first connection attempt is unsuccessful, in 30 seconds Striim will try again. If the second attempt is unsuccessful, in 30 seconds Striim will try again. If the third attempt is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Consumer Group

java.lang.String

If E1P is true, specify an Event Hub consumer group for Striim to use for tracking which events have been written.

E1P

java.lang.Boolean

false

With the default value, after recovery (see Recovering applications) there may be some duplicate events. Set to true to ensure that there are no duplicates ("exactly once processing"). If recovery is not enabled for the application, this setting will have no effect.Recovering applications

When this property is set to true, the target event hub must be empty the first time the application is started, and other applications must not write to the event hub.

Known issue DEV-16624: When set to true, AzureEventHubWriter will use approximately 42 MB of memory per partition, so if the hub has 32 partitions, it will use 1.3 GB.

Event Hub Name

java.lang.String

the name of the event hub, which must exist when the application is started and have between two and 32 partitions

Event Hub Namespace

java.lang.String

the namespace of the specified event hub

Operation Timeout

java.lang.Integer

1m

amount of time Striim will wait for Azure to respond to requests (reading, writing, or closing connections) before the application will fail

Partition Key

java.lang.String

The name of a field in the input stream whose values determine how events will be distributed among multiple partitions. Events with the same partition key field value will be written to the same partition.

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fieldsMySQLReader WAEvent fieldsOracleReader WAEvent fields, or MSSQLReader WAEvent fields) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>).

SAS Key

java.lang.String

the primary key associated with the SAS policy

SAS Policy Name

java.lang.String

an Azure SAS policy to authenticate connections (see Shared Access Authorization Policies)

For samples of the output, see:

If E1P is set to true, the records will contain information Striim can use to ensure no duplicate records are written during recovery (see Recovering applications).Recovering applications

The following sample application will write data from PosDataPreview.csv to an event hub.

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET OracleQATEST USING AzureEventHubWriter (
  EventHubNamespace:'myeventhub-ns',
  EventHubName:’PosAppData’,
  SASTokenName:'RootManageSharedAccessKey',
  SASToken:'******',
  PartitionKey:'merchantId'
)
FORMAT USING DSVFormatter ()
INPUT FROM PosDataStream;

The following sample application will replicate data from two Oracle tables to two partitions in an event hub.

CREATE SOURCE OracleSource1 USING OracleReader (
  Username:'myname',
  Password:'******',
  ConnectionURL: 'localhost:1521:XE’,
  Tables:'QATEST.EMP;QATEST.DEPT’
) 
OUTPUT TO sourceStream;

CREATE TARGET OracleQATEST USING AzureEventHubWriter (
  EventHubNamespace:'myeventhub-ns',
  EventHubName:’OracleData’,
  SASTokenName:'RootManageSharedAccessKey',
  SASToken:'******',
  PartitionKey:'@metadata(TableName)',
  E1P:'True',
  ConsumerGroup:'testconsumergroup'
)
FORMAT USING DSVFormatter()
INPUT FROM sourceStream;