Ingesting Data¶
Note
Flow’s current ingestion capabilities should be considered a proof of concept. We plan to make it much simpler to express integrations into pub/sub systems, databases for “change data capture”, and more.
There are a number of different options for how to ingest data into Flow:
HTTP PUT or POST requests
Stream data over a Websocket in either CSV, TSV, or JSON formats
flow-ingester¶
Flow ships with the flow-ingester
binary, which provides network service endpoints for ingesting
data into Flow Collections. There are currently two main APIs, a REST API accepts data in HTTP PUT and
POST requests, and a websocket API that accepts data streamed over websocket connections. Only
captured collections may ingest data in this way, not derivations.
When you run flowctl develop
, the Flow Ingester will listen on http://localhost:8081
by default.
Flow Ingester will always validate all documents against the collection’s schema before they are written, so invalid data will never be added to the collection. Note that your collection schema may be as permissive as you like, and you can always apply more restrictive schemas in derivations if you want to.
Flow Ingester will also reduce all documents according to the collection key and reduction annotations on the schema, if present. This is done to optimize the storage space for collections that see frequent updates to the same key.
REST API¶
The REST API makes it easy to add data to one or more Flow collections transactionally. The endpoint
is available at /ingest
(e.g. http://localhost:8081/ingest
). This endpoint will respond only
to PUT and POST requests with a Content-Type: application/json
. Any other method or content type
will result in a 404 error response. The request body should be a JSON object where the keys are
names of Flow Collections, and the values are arrays of documents for that collection. For example,
curl -H 'Content-Type: application/json' --data @- 'http://localhost:8081/ingest' <<EOF
{
"examples/citi-bike/rides": [
{
"bike_id": 7,
"begin": {
"timestamp": "2020-08-27 09:30:01.2",
"station": {
"id": 3,
"name": "Start Name"
}
},
"end": {
"timestamp": "2020-08-27 10:00:02.3",
"station": {
"id": 4,
"name": "End Name"
}
}
}
]
}
EOF
Running the above should result in output similar to the following:
{"Offsets":{"examples/citi-bike/rides/pivot=00":305},"Etcd":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":28,"raft_term":2}}
In this example, we are ingesting a single document (beginning with { "bike_id": 7,...
)
into the collection examples/citi-bike/rides
. You may ingest any number of documents into any
number of Flow Collections in a single request body, and they will be added in a single transaction.
The response Offsets
includes all of the Gazette journals where the data was written, along with
the new “head” of the Journal. This is provided only to allow for applications to read data directly
from Gazette or cloud storage if desired.
REST Transactional Semantics¶
Flow Ingester will ingest the data using a single Gazette transaction per REST request. For details on Gazette transactions, see the Gazette Transactional Append docs. The summary is basically that:
If the HTTP response indicates success, then the documents are guaranteed to be written to the gazette brokers and replicated.
If the HTTP response indicates an error, then the transaction will not be committed and no derivations will observe any of the documents.
Websocket API¶
The Websocket API provides an alternative for ingesting data, which is especially useful when you don’t know how much data there is ahead of time, or when you don’t need precise control over transaction boundaries. When ingesting over a websocket, the ingester will automatically divide the data into periodic transactions to provide optimal performance. The websocket API is also more flexible in the data formats that it can accept, so it’s able to ingest CSV/TSV data directly, in addition to JSON. The websocket API is only able to ingest into a single collection per websocket connection, though.
The collection for websocket ingestions is given in the path of the URL, as in:
/ingest/<collection-name>
. For example, to ingest into the examples/citi-bike/rides
collection, you’d use ws://localhost:8081/ingest/examples/citi-bike/rides
.
For all websocket ingestions, the Sec-Websocket-Protocol header must be set when initiating the websocket connection. The value must be one of:
json/v1
csv/v1
tsv/v1
If you’re using the websocat CLI, then you can simply use the --protocol
option.
Ingesting JSON over Websocket¶
When ingesting JSON, Flow Ingester accepts data over the websocket in “JSON-newline” (a.k.a. JSON Lines) format.
Objects should not be enclosed within an array or have any separator characters between them except for whitespace.
For example, to ingest a few rides into the examples/citi-bike/rides
collection, lets start with
the documents in JSON Lines format in the file rides.jsonl
:
{"bike_id":7,"begin":{"timestamp":"2020-08-27 09:30:01","station":{"id":66,"name":"North 4th St"}},"end":{"timestamp":"2020-08-27 10:00:02","station":{"id":23,"name":"High St"}}}
{"bike_id":26,"begin":{"timestamp":"2020-08-27 09:32:01","station":{"id":91,"name":"Grant Ave"}},"end":{"timestamp":"2020-08-27 09:50:12","station":{"id":23,"name":"High St"}}}
Given the above content in a file named rides.jsonl
, we could ingest it using websocat
like
so:
cat rides.jsonl | websocat --protocol json/v1 'ws://localhost:8081/ingest/examples/citi-bike/rides'
This will add the data to the collection named examples/citi-bike/rides
.
Ingesting CSV/TSV over Websocket¶
Flow Ingester is able to ingest a few different character-separated formats. Currently it supports
Comma-separated (CSV) and Tab-separated (TSV) formats, using the csv/v1 and tsv/v1 protocols,
respectively. Flow collections always store all data in JSON documents that validate against the
collection’s schema, so the tabular data in character-separated files must be converted to JSON
before being written. Flow Ingester will convert these for you, based on the headers in the data and
the projections for the Flow Collection. Each header in a character-separated ingestion must have
the same name as a projection of the Collection. The projection will
be used to map the field named by the header to the JSON pointer, which is used to construct the
JSON document. For example, the examples/citi-bike/rides
collection looks like this:
Given this, we could ingest a CSV file that looks like:
bikeid,starttime,"start station id","start station name",stoptime,"end station id","end station name"
7,"2020-08-27 09:30:01",66,"North 4th St","2020-08-27 10:00:02",23,"High St"
26,"2020-08-27 09:32:01",91,"Grant Ave","2020-08-27 09:50:12",23,"High St"
Assuming this was the content of rides.csv
, you could ingest it using:
cat rides.csv | websocat --protocol csv/v1 'ws://localhost:8081/ingest/examples/citi-bike/rides'
The actual JSON documents that would be written to the collection are:
{"bike_id":7,"begin":{"timestamp":"2020-08-27 09:30:01","station":{"id":66,"name":"North 4th St"}},"end":{"timestamp":"2020-08-27 10:00:02","station":{"id":23,"name":"High St"}}}
{"bike_id":26,"begin":{"timestamp":"2020-08-27 09:32:01","station":{"id":91,"name":"Grant Ave"}},"end":{"timestamp":"2020-08-27 09:50:12","station":{"id":23,"name":"High St"}}}
For example, the projection bikeid: /bike_id
means that, for each row in the CSV, the value of
the “bikeid” column was used to populate the bike_id
property of the final document. Flow uses
the collection’s json schema to determine the required type of each property. Additionally, each
document that’s constructed is validated against the collection’s schema prior to it being written.
Null, Empty, and Missing Values¶
In JSON documents, there’s a difference between an explicit null
value and one that’s undefined.
When Flow Ingester parses a character-separated row, it also differntiates between null
, empty
string, and undefined values. Empty values being ingested are always interpreted as explicit
null
values as long as the schema location allows for null
values (e.g. type: ["integer",
"null"]
). If the schema does not allow null
as an acceptable type, but it does allow
string
, then the value will be interpreted as an empty string. A row may also have fewer values
than exist in the header row. If it does, than any unspecified column values will be undefined in
the final document. In the following example, let’s assume that the schema allows for the types in
each column’s name.
id,string,stringOrNull,integerOrNull
1,"","",""
2,,,
3,
4
Assuming simple direct projections, this would result in the following JSON documents being ingested:
1 2 3 4 | {"id": 1, "string": "", "stringOrNull": null, "integerOrNull": null}
{"id": 2, "string": "", "stringOrNull": null, "integerOrNull": null}
{"id": 3, "string": ""}
{"id": 4}
|
Note how in rows 1
and 2
, empty stringOrNull
values are mapped to null
, regardless of the presence of
quotes. In row 3
, the trailing comma indicates that the row has two values, and that the second value is empty (""
), but the remainder are undefined. In row 4
, all values besides id
are undefined.
Websocket Responses¶
Regardless of which format you ingest, all websocket ingestions will return responses similar to the following:
{"Offsets":{"examples/citi-bike/rides/pivot=00":545},"Etcd":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":28,"raft_term":2},"Processed":2}
The response will show the offsets of the transaction boundaries in the Gazette
journals. If you ingest larger amounts of data, you will receive many such responses. In addition to the
journal offsets, each response also includes the Processed
property, which indicates the number
of websocket frames that have been successfully ingested. This can be used to allow clients to
resume where they left off in the case that a websocket ingestion fails partway through. For
example, if you sent one json object per websocket frame, then you would know from the Processed
field how many documents had been successfully ingested prior to the failure (Processed
times
the number of documents per frame).