Create Stream Application¶
Introduction¶
Stream applications are declarative specs that define the processing logic to process the events sent to the stream processor. A stream app definition contains the following configurations:
Configuration | Description |
---|---|
Stream | A logical series of events ordered in time with a uniquely identifiable name, and set of defined attributes with specific data types defining its schema. |
Source | This consumes data from external sources (such as TCP , Kafka , HTTP , etc) in the form of events, then converts each event (that can be in XML , JSON , binary , etc. format) to a stream event, and passes that to a stream for processing. |
Sink | This takes events arriving at a stream, maps them to a predefined data format (such as XML , JSON, binary , etc), and publishes them to external endpoints (such as E-mail , TCP , Kafka , HTTP , etc). |
Table | A structured representation of data stored with a defined schema. Stored data can be backed by In-Memory, or external data stores such as RDBMS, MongoDB, etc. The tables can be accessed and manipulated at runtime. |
Executional Element | An executional element can be one of the following:
|
Macrometa provide in-build source, sink and store explained in the later section of this document.
Creating a Stream Application¶
To create a stream application follow the steps below:
- Open the GUI. Click on
Stream Apps
tab. - Click on New to start defining a new stream application.
- Enter a Name as
SweetProductionAnalysis
or feel free to chose any other name for the stream application. - Enter a Description.
-
Add the following sample stream application.
@source(type = 'c8db', collection='SweetProductionData', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type= 'c8streams', stream='ProductionAlertStream', @map(type='json')) define stream ProductionAlertStream (name string, amount double); select * from SweetProductionStream insert into ProductionAlertStream;
-
Click
Save
to save the stream app. - Select all the regions to deploy your application in.
- Click on
Save
.
Source¶
C8Streams¶
Syntax
@source(type="c8streams", stream.list="<STRING>", replication.type="<STRING>", @map(...)))
Example
@source(type="c8streams", stream.list="OrderStream", replication.type="local", @map(type='json')))
define stream OrderStream(product_id string, quantity integer)
If @source
annotation is not provided, c8stream
is considered as a the default source. Stream application will use the c8stream with the default query parameters explained in the chart below. In the above example, stream can also be defined as
define stream OrderStream(product_id string, quantity integer)
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
stream.list | This specifies the list of streams to which the source must listen. This list can be provided as a set of comma-separated values e.g. stream_one,stream_two | STRING | No | |
replication.type | Specifies if the replication type of the streams. Possible values can be local and global | local | STRING | Yes |
C8DB¶
Syntax
@source(type = 'c8db', collection="STRING", replication.type="STRING", collection.type="STRING", @map(...))
Example
@source(type = 'c8db', collection='SweetProductionData', @map(type='json'))
define stream SweetProductionStream (name string, amount double);
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
collection | This specifies the name of the c8db collection to which the source must listen. | STRING | No | |
replication.type | Specifies if the replication type of the c8db collection. Possible values can be local and global | local | STRING | Yes |
collection.type | This specifies the type of the data collection contains. Possible values can be doc and edge . | doc | STRING | Yes |
Sink¶
C8Streams¶
Syntax
@sink(type="c8streams", stream="<STRING>", replication.type="<STRING>", @map(...)))
Example
@sink(type= 'c8streams', stream='ProductionAlertStream', @map(type='json'))
define stream ProductionAlertStream (name string, amount double);
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
stream | The streams to which the C8Stream sink needs to publish events. | STRING | No | |
replication.type | Specifies if the replication type of the stream. Possible values can be local and global | local | STRING | Yes |
Table¶
C8DB¶
Syntax
@store(type = 'c8db', collection="STRING", replication.type="STRING", collection.type="STRING", from="STRING", to="STRING")
Example
@store(type = 'c8db', collection='SweetProductionCollection')
define table SweetProductionCollection (name string, amount double);
If @store
annotation is not provided, c8db is considered as a the default store. Stream application will use the c8db with the default query parameters explained in the chart below. In the above example, store can also be defined as
define table SweetProductionCollection (name string, amount double);
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
collection | This specifies the name of the c8db collection to which events must written. | STRING | No | |
replication.type | Specifies if the replication type of the c8db collection. Possible values can be local and global | local | STRING | Yes |
collection.type | This specifies the type of the data collection contains. Possible values can be doc and edge . | doc | STRING | Yes |
from | If collection.type is specified as edge , this field indicates which field to be considered as a source node of the edge. | _from | STRING | Yes |
to | If collection.type is specified as edge , this field indicates which field to be considered as a destination node of the edge. | _to | STRING | Yes |
Tutorials¶
Following tutorials cover various user scenarios using Macrometa Stream Processing.
- Publishing Data
- Consuming Data
- Filtering Data
- Transforming Data
- Enriching Data
- Executing Scripts →
- Correlating Data
- Summarizing Data
- Data Pipelines
- Exposing Data as API
Please refer to Reference for additional stream processing examples.