Supported Integrations¶
NStack is built to integrate with existing infrastructure, event, and data-sources. Typically, this is by using them as sources and sinks in the NStack Workflow Language.
See also
Learn more about sources and sinks in Concepts
Sources¶
Schedule¶
Sources.schedule<()> {
cron = "* * * * * *"
}
NStack’s Schedule source allows you to run a workflow in intervals over a time period. It takes a single argument of a crontab, which specifies the interval to use.
Note that NStack’s scheduler expects six fields: minute, hour, day of month, month, day(s) of week, year. As the scheduler emits events, it is of type Unit, which is represented by ()
Postgres¶
Sources.postgres<Text> {
pg_host = "localhost", pg_port = "5432",
pg_user = "user", pg_password = "123456",
pg_database = "db", pg_query = "SELECT * FROM tbl;" }
pg_port
defaults to 5432, pg_user
defaults to postgres
, and
pg_password
defaults to the empty string. The other parameters are mandatory.
HTTP¶
Sources.http<Text> { http_path = "/foo" }
NStack’s HTTP source allows you to expose an NStack workflow as an HTTP endpoint with a very simple API.
The HTTP source must be configured with the http_path
,
a relative URL for the endpoint
on which it will listen for requests, e.g /foo
.
All HTTP source endpoints listen on port 8080
.
Calling¶
To call the endpoint, you need to send an HTTP request with the following properties:
Parameter | Value |
---|---|
verb | PUT or POST |
content-type | any allowed, but we suggest application/json |
The request should have a body
containing a single JSON object
with a single property called params
,
which contains a JSON encoded value
of the type expected by the Source.
e.g. { "params" : "Hello World" }
With curl
:
Using the command line utility curl
, you can easily run a single command to call the endpoint.
curl -X PUT -d '{ "params" : "Hello World" }' demo.nstack.com:8080/foo
With nstack send
:
The nstack
cli utility has a built-in send
command
for interacting with HTTP sources,
to use it just pass in the endpoint relative url,
and the JSON encoded value to send
(no need to specify the full params object).
nstack send "/foo" '"Hello World"'
Note
Note the double quoting on the value - the outer pair of (single) quotes are consumed by the shell, the inner quotes are part of the JSON representation for a string.
RabbitMQ (AMQP)¶
Sources.amqp<Text> {
amqp_host = "localhost", amqp_port = "5672",
amqp_vhost = "/", amqp_exchange = "ex",
amqp_key = "key"
}
amqp_port
defaults to 5672 and amqp_vhost
defaults to /
.
The other parameters are mandatory.
Stdin¶
Sources.stdin<Text>
Sources.stdin
has type Text
.
It does not take any arguments and does not require a type annotation,
but if the type annotation is present,
it must be Text
.
When Sources.stdin
is used as a process’s source,
you can connect to that process by running
nstack connect $PID
where $PID
is the process id
(as reported by nstack start
and nstack ps
).
After that,
every line fed to the standard input of nstack connect
will be passed to the process as a separate Text
value,
without the trailing newline.
To disconnect, simulate end-of-file by pressing Ctrl-D
on UNIX
or Ctrl-Z
on Windows.
BigQuery¶
A module which uploads data from BigQuery, downloads data from BigQuery, or runs an SQL query. See the Big Query Walkthrough for in-depth documentation.
Custom¶
You can define a custom source in Python by declaring a function of type
Void -> t
(where t
is any supported type except Void
)
and implementing this function in Python.
The return type of this function must be a generator that returns values of type t
.
Sinks¶
Postgres¶
Sinks.postgres<Text> {
pg_host = "localhost", pg_port = "5432",
pg_user = "user", pg_password = "123456",
pg_database = "db", pg_table = "tbl" }
Like for Postgres source,
pg_port
defaults to 5432, pg_user
defaults to postgres
, and
pg_password
defaults to the empty string. The other parameters are mandatory.
RabbitMQ (AMQP)¶
Sinks.amqp<Text> {
amqp_host = "localhost", amqp_port = "5672",
amqp_vhost = "/", amqp_exchange = "ex",
amqp_key = "key"
}
Like for AMQP source,
amqp_port
defaults to 5672 and amqp_vhost
defaults to /
.
The other parameters are mandatory.
AWS S3¶
An NStack sink for uploading files to S3 storage on Amazon Web Services
import AWS.S3:0.0.1-SNAPSHOT as S3
S3.upload { ...config... }
Functions¶
upload : {filepath: Text, data: [Byte]} -> Text
Uploads a file (represented as a sequence of bytes) to S3 with the given filepath, and returns a Text
indicating the item URL
.
Config¶
The following configuration parameters are used for uploading to S3:
s3_key_id
- Your AWS Credentials KeyIds3_secret_key
- Your AWS Credentials secret keys3_bucket
- The S3 bucket to upload items into
Stdout¶
Sinks.stdout<Text>
Sinks.stdout
has type Text
.
It does not take any arguments and does not require a type annotation,
but if the type annotation is present,
it must be Text
.
When Sinks.stdout
is used as a process’s source,
you can connect to that process by running
nstack connect $PID
where $PID
is the process id
(as reported by nstack start
and nstack ps
).
After that,
every Text
value produced by the process
will be printed to the standard output by nstack connect
.
To disconnect, simulate end-of-file by pressing Ctrl-D
on UNIX
or Ctrl-Z
on Windows.
Custom¶
You can define a custom sink in Python by declaring a function of type
t -> Void
(where t
is any supported type except Void
)
and implementing this function in Python as usual.
The return type of this function will be ignored.
Conversions¶
JSON¶
Conv.from_json<(Integer,Boolean)>
Conv.to_json<(Integer,Boolean)>
These functions convert between nstack values and Text
values
containing JSON. They have types
Conv.from_json<type> : Text -> type
Conv.to_json<type> : type -> Text
Supported types are:
Integer
Double
Boolean
Text
[Byte]
- Arrays of supported types
- Tuples of supported types
- Structs of supported types
CSV¶
Conv.from_csv<(Integer,Boolean)>
Conv.to_csv<(Integer,Boolean)>
These functions convert between nstack values and Text
values
containing comma-separated fields. They have types
Conv.from_csv<type> : Text -> type
Conv.to_csv<type> : type -> Text
Supported field types are:
Integer
Double
Boolean
(encoded asTRUE
orFALSE
)Text
[Byte]
- Optional of another supported field type
Supported row types are:
- Arrays of supported field types
- Tuples of supported field types
- Structs of supported field types
If the row type is a struct, then the first emitted or consumed value is the CSV header. The column names in the header correspond to the field names of the struct.
If the row type is an array or a tuple, no header is expected or produced.
Text values produced by to_csv
are not newline-terminated.
Text values consumed by from_csv
may or may not be newline-terminated.