Striim 3.9.6 documentation

OPCUAReader

Reads data from an OPC-UA server. See the OPC UA Unified Architecture Specification, Part 4: Services for information on the MonitoredItem, Session, and Subscription service sets used by this reader.

If PKCS 12 is required, generate a certificate for Striim and add its public key in DER format to the the OPC-UA server's keystore. See OPC UA Unified Architecture Specification, Part 6: Mappings, Annex E.

property

type

default value

notes

appURI

java.lang.String

URI to connect to the OPC-UA server: for example, rn:striim:opcua:sensor:connector

If a PKCS 12 certificate has been provided, make sure that its AppUriName field matches this value.

clientAlias

java.lang.String

PKCS 12 keystore certificates alias, required when keystore is specified

keepAliveCount

java.lang.Long

10

number of publish intervals with no data after which Striim will send an empty notification to let the server know the client is still running

keystore

java.lang.String

fully qualified name of PKCS 12 certificate: for example, /path/to/certificate.pfx

keystorePassword

com.webaction. security.password

PKCS 12 keystore password, required when keystore is specified

lifeTimeCount

java.lang.Long

20

number of publish intervals with no requests from Striim after which the OPC-UA server will assume that the Striim client is no longer running and remove its subscription; must be higher than the keepAliveCount

maxNotificationPerPublish

java.lang.Integer

2

number of events to receive per published notification

messageSecurityMode

java.lang.String

None

supported values (case-sensitive) are None, Sign, SignAndEncrypt: see  OPC UA Unified Architecture Specification, Part 4: Services, section 7.15, and Part 6: Mappings, section 6.1

nodeIdList

java.lang.String

comma-separated list (case sensitive) of variable or object nodes to be monitored: for example, ns=1;s=Sensor/Temperature,ns=2;s=Sensor/Pressure; if an object node is specified, all its variables are returned

OPCUAEndpointURL

java.lang.String

endpoint for the OPC-UA server: for example, opc.tcp://localhost:12686/example

password

com.webaction. security.password

the password for the specified username

publishInterval

java.lang.Double

2000

how often (in milliseconds) the OPC-UA server checks for requests from Striim

Do not set to -1 or 0. Instead of using a very small publishInterval, consider using a smaller samplingInterval.

queueSize

java.lang.Integer

10

see OPC UA Unified Architecture Specification, Part 4: Services, section 5.12.15

readTimeOut

java.lang.Long

10000

time (in milliseconds) to wait for the server to respond to a connection request before crashing the application

samplingInterval

java.lang.Double

2000

how often (in milliseconds) the OPC-UA updates the values for the variables specified in nodeIdList

securityPolicy

java.lang.String

None

supported values (case-sensitive) are Basic128Rsa15, Basic256, Basic256Sha256, and None: see OPC UA Unified Architecture Specification, Part 7: Profiles, sections 6.5.147-150

severity

java.lang.Integer

15

see OPC UA Unified Architecture Specification, Part 5: Information Model, section 6.4.2

username

java.lang.String

see OPC UA Unified Architecture Specification, Part 4: Services, section 7.36.3

The output format is OPCUADataChangeEvent:

Striim field / type

UA built-in type / field name / data type

notes

dataValue / java.lang.Object

DataValue / Value / Variant

the new value after the change 

dataIsNull / java.lang.Boolean

dataIsNotNull / java.lang.Boolean

dataTypeNodeId / java.lang.String

NodeId

see OPC UA Unified Architecture Specification, Part 6: Mappings, section 5.2.2.9 and A.3

sourceTime / java.lang.Long

DataValue / SourceTimestamp / DateTime

the time the change was made in the source device or program

sourcePicoSeconds / java.lang.Long

DataValue / SourcePicoSeconds / UInt16

see OPC UA Unified Architecture Specification, Part 6: Mappings, section 5.2.2.17

serverTime / java.lang.Long

DataValue / ServerTimestamp / DateTime

the time the server recorded the change

serverPicoSeconds / java.lang.Long

DataValue / ServerPicoSeconds / UInt16

see OPC UA Unified Architecture Specification, Part 6: Mappings, section 5.2.2.17

statusCodeValue / java.lang.Long

DataValue / Status / StatusCode

see https://github.com/OPCFoundation/UA-.NET/blob/master/Stack/Core/Schema/Opc.Ua.StatusCodes.csv for a list of possible values

statusCodeIsGood / java.lang.Boolean

derived from http://www.opcfoundation.org/UA/schemas/1.02/StatusCode.csv

statusCodeIsBad / java.lang.Boolean

derived from http://www.opcfoundation.org/UA/schemas/1.02/StatusCode.csv

statusCodeIsUncertain / java.lang.Boolean

derived from http://www.opcfoundation.org/UA/schemas/1.02/StatusCode.csv

statusCodehasOverflowSet / java.lang.Boolean

see OPC UA Unified Architecture Specification, Part 4: Services, section 5.12.1.5

metadata / java.util.Map

see example in the sample event below

Sample OPCUADataChangeEvent:

{
  dataValue: 3.14
  dataIsNull: false
  dataIsNotNull: true
  dataTypeNodeId: "ns=0;i=11"
  sourceTime: 131517670918430000
  sourcePicoSeconds: null
  serverTime: 131517670918430000
  serverPicoSeconds: null
  statusCodeValue: 0
  statusCodeIsGood: true
  statusCodeIsBad: false
  statusCodeIsUncertain: false
  statusCodehasOverflowSet: false
  metadata: 
{
    "Description": {
        "locale": null,
        "text": null
    },
    "monitoringMode": "Reporting",
    "requestedQueueSize": 10,
    "readValueId": {
        "typeId": {
            "namespaceIndex": 0,
            "identifier": 626,
            "type": "Numeric",
            "null": false,
            "notNull": true
        },
        "nodeId": {
            "namespaceIndex": 2,
            "identifier": "HelloWorld/ScalarTypes/Double",
            "type": "String",
            "null": false,
            "notNull": true
        },
        "binaryEncodingId": {
            "namespaceIndex": 0,
            "identifier": 628,
            "type": "Numeric",
            "null": false,
            "notNull": true
        },
        "xmlEncodingId": {
            "namespaceIndex": 0,
            "identifier": 627,
            "type": "Numeric",
            "null": false,
            "notNull": true
        },
        "attributeId": 13,
        "indexRange": null,
        "dataEncoding": {
            "namespaceIndex": 0,
            "name": null,
            "null": true,
            "notNull": false
        }
    },
    "ValueRank": -1,
    "requestedSamplingInterval": 2000.0,
    "revisedSamplingInterval": 2000.0,
    "filterResult": {
        "bodyType": "ByteString",
        "encoded": null,
        "encodingTypeId": {
            "namespaceIndex": 0,
            "identifier": 0,
            "type": "Numeric",
            "null": true,
            "notNull": false
        }
    },
    "BrowseName": {
        "namespaceIndex": 2,
        "name": "Double",
        "null": false,
        "notNull": true
    },
    "ArrayDimensions": "-1",
    "NodeId": {
        "namespaceIndex": 2,
        "identifier": "HelloWorld/ScalarTypes/Double",
        "type": "String",
        "null": false,
        "notNull": true
    },
    "DataType": "Double",
    "clientHandle": 11,
    "monitoredItemId": 11,
    "revisedQueueSize": 10
}
}

The following sample application writes OPC-UA data to an HBase table. The table contains the most recently reported value for each node. The application is divided into two flows so that the source and CQ can run in a Forwarding Agent on the OPC-UA server.

CREATE APPLICATION OPCUAapp;

CREATE FLOW OPCUAFlow;

CREATE SOURCE OPCUASource USING OPCUAReader (
  OPCUAEndpointURL:'opc.tcp://mfactorengineering.com:4840',
  nodeIdList:'ns=1;s=EVR2.state.Empty_Box_Timer'
)
OUTPUT TO NotificationStream;

CREATE TYPE OPCUADataChange (
  data java.lang.Object,
  dataIsNull java.lang.Boolean,
  dataIsNotNull java.lang.Boolean,
  dataTypeNodeId java.lang.Object,
  sourceTime java.lang.Long,
  serverTime java.lang.Long,
  sourcePicoSeconds java.lang.Long,
  serverPicoSeconds java.lang.Long,
  statusCodeValue java.lang.Long,
  statusCodeGood java.lang.Boolean,
  statusCodeBad java.lang.Boolean,
  statusCodeUncertain java.lang.Boolean,
  statusCodeHasOverflowSet java.lang.Boolean,
  dataType java.lang.String,
  valueRank java.lang.Integer,
  arrayDimensions java.lang.Object,
  BrowseName java.lang.Object,
  Description java.lang.Object,
  monitoringMode java.lang.String,
  nodeIdNS java.lang.Integer,
  nodeIdIdentifier java.lang.Object,
  displayName java.lang.String  KEY
);
CREATE STREAM OPCUAStream OF OPCUADataChange;

CREATE CQ OPCUADataCQ
INSERT INTO OPCUAStream
SELECT dataValue,
  dataIsNull,
  dataIsNotNull,
  dataTypeNodeId,
  sourceTime,
  serverTime,
  sourcePicoSeconds,
  serverPicoSeconds,
  statusCodeValue,
  statusCodeIsGood,
  statusCodeIsBad,
  statusCodeIsUncertain,
  statusCodehasOverflowSet,
  META(x,"DataType"),
  META(x,"ValueRank"),
  TO_JSON_NODE(META(x,"ArrayDimensions")),
  TO_JSON_NODE(META(x,"BrowseName")),
  META(x,"Description"),
  META(x,"monitoringMode"),
  TO_JSON_NODE(META(x,"NodeId")).get("namespaceIndex").asInt(),
  TO_JSON_NODE(META(x,"NodeId")).get("identifier"),
  TO_JSON_NODE(META(x,"displayName")).get("text").asText()
FROM NotificationStream x;

END FLOW OPCUAFlow;

CREATE FLOW HBASEFlow;
CREATE TARGET HBaseTarget USING HBaseWriter (
  HBaseConfigurationPath:"/path/to/hbase/conf/hbase-site.xml",
  Tables: "OPCUAEvents.data",
  PKUpdateHandlingMode: "DELETEANDINSERT"
)
INPUT FROM OPCUAStream;
END FLOW HBASEFlow;

END APPLICATION OPCUAapp;