Citi Bike System Data¶
We’ll be using Flow to capture and process Citi Bike system data. The dataset is available in the S3 “tripdata” bucket as compressed CSV files of each ride taken within the system, by month.
Modeling Rides¶
Every Flow collection has an associated JSON Schema which describes its documents. Let’s begin with an example of a ride document that we want to schematize:
{
"bike_id": 26396,
"duration_seconds": 1193,
"user_type": "Customer",
"gender": 0,
"birth_year": 1969,
"begin": {
"station": {
"geo": {
"latitude": 40.711863,
"longitude": -73.944024
},
"id": 3081,
"name": "Graham Ave & Grand St"
},
"timestamp": "2020-09-01 00:00:12.2020"
},
"end": {
"station": {
"geo": {
"latitude": 40.68402,
"longitude": -73.94977
},
"id": 3048,
"name": "Putnam Ave & Nostrand Ave"
},
"timestamp": "2020-09-01 00:20:05.5470"
}
}
Ride Schema¶
We’ve already gone to the trouble of creating a JSON Schema which models Citi Bike rides, which you can find here. We can remotely link to it from our catalog so we don’t have to repeat ourselves. A few things about it to point out:
- It defines the shape that documents can take.
A “ride” document must have a bike_id, begin, and end. A “location” must have a latitude, longitude, and so on. The
$ref
keyword makes it easy to re-use common structures.- Validations constrain the types and values that documents can take.
A “longitude” must be a number and fall within the expected range, and “gender” must be a value within the expected enumeration. Some properties are
required
, while others are optional. Flow enforces that all documents of a collection must validate against its schema before they can be added.Flow is also able to translate many schema constraints (e.g. “/begin/station/id must exist and be an integer”) into other kinds of schema – like TypeScript types and SQL constraints – which promotes end-to-end type safety and a better development experience.
- Annotations attach information to locations within the document.
title
anddescription
keywords give color to locations of the document. They’re machine-accessible documentation – which makes it possible to re-use these annotations in transformed versions of the schema.
Capturing Rides¶
To work with ride events, first we need to define a collection into which we’ll ingest them. Simple enough, but a wrinkle is that the source dataset is CSV files, using header names which don’t match our schema:
$ wget https://s3.amazonaws.com/tripdata/202009-citibike-tripdata.csv.zip
$ unzip -p 202009-citibike-tripdata.csv.zip | head -5
"tripduration","starttime","stoptime","start station id","start station name","start station latitude","start station longitude","end station id","end station name","end station latitude","end station longitude","bikeid","usertype","birth year","gender"
4225,"2020-09-01 00:00:01.0430","2020-09-01 01:10:26.6350",3508,"St Nicholas Ave & Manhattan Ave",40.809725,-73.953149,116,"W 17 St & 8 Ave",40.74177603,-74.00149746,44317,"Customer",1979,1
1868,"2020-09-01 00:00:04.8320","2020-09-01 00:31:13.7650",3621,"27 Ave & 9 St",40.7739825,-73.9309134,3094,"Graham Ave & Withers St",40.7169811,-73.94485918,37793,"Customer",1991,1
1097,"2020-09-01 00:00:06.8990","2020-09-01 00:18:24.2260",3492,"E 118 St & Park Ave",40.8005385,-73.9419949,3959,"Edgecombe Ave & W 145 St",40.823498,-73.94386,41438,"Subscriber",1984,1
1473,"2020-09-01 00:00:07.7440","2020-09-01 00:24:41.1800",3946,"St Nicholas Ave & W 137 St",40.818477,-73.947568,4002,"W 144 St & Adam Clayton Powell Blvd",40.820877,-73.939249,35860,"Customer",1990,2
Projections let us account for this, by defining a mapping between document locations (as JSON Pointers) and corresponding fields in a flattened, table-based representation such as a CSV file or SQL table. They’re used whenever Flow is capturing from or materializing into table-systems.
Putting it all together, let’s define a captured “rides” collection:
collections:
- name: examples/citi-bike/rides
key: [/bike_id, /begin/timestamp]
schema: https://raw.githubusercontent.com/estuary/docs/developer-docs/examples/citi-bike/ride.schema.yaml
# Define projections for each CSV header name used in the source dataset.
projections:
bikeid: /bike_id
birth year: /birth_year
end station id: /end/station/id
end station latitude: /end/station/geo/latitude
end station longitude: /end/station/geo/longitude
end station name: /end/station/name
gender: /gender
start station id: /begin/station/id
start station latitude: /begin/station/geo/latitude
start station longitude: /begin/station/geo/longitude
start station name: /begin/station/name
starttime: /begin/timestamp
stoptime: /end/timestamp
tripduration: /duration_seconds
usertype: /user_type
As this is a tutorial, we’ll use a Flow ingestion API to capture directly from CSV. In a real-world setting, you could instead bind the collection to a pub/sub topic, S3 bucket and path, or a database table (via change data capture):
# Start a local development instance, and leave it running:
$ flowctl develop
# In another terminal:
$ examples/citi-bike/load-rides.sh
Last-Seen Station of a Bike¶
We’ll declare and test a collection that derives, for each bike, the station it last arrived at:
import:
- rides.flow.yaml
collections:
- name: examples/citi-bike/last-seen
key: [/bike_id]
schema:
type: object
properties:
bike_id: { $ref: ride.schema.yaml#/properties/bike_id }
last: { $ref: ride.schema.yaml#/$defs/terminus }
required: [bike_id, last]
derivation:
transform:
locationFromRide:
source: { name: examples/citi-bike/rides }
publish:
nodeJS: |
return [{ bike_id: source.bike_id, last: source.end }];
We can materialize the collection into a database table:
$ flowctl materialize --collection examples/citi-bike/last-seen --table-name last_seen --target testDB
If you’re in VSCode, you can query the attached database using the “SQLTools”
icon on the left bar. Or, use psql
:
select bike_id, "last/station/name", "last/timestamp" from last_seen limit 10;
Materialization tables always use the collection’s key, and are update continuously to reflect ongoing changes of the collection.
Bike Relocations¶
Citi Bike will sometimes redistribute bikes between stations, when a station gets too full or empty. These relocations show up as “holes” in the ride data, where a bike mysteriously ends a ride at one station and starts its next ride at a different station.
Suppose we want a collection which is enriched with explicit “relocation” events. To derive it, we must determine if the start of a current ride is different than the end of a previous ride, for each bike. But, we don’t have the prior ending station station available in the source document.
We can use registers to preserve a previous ending station, and compare it with a current starting station for each bike:
import:
- rides.flow.yaml
collections:
- name: examples/citi-bike/rides-and-relocations
key: [/bike_id, /begin/timestamp]
schema:
# Relocations are rides marked by a "relocation: true" property.
$ref: ride.schema.yaml
properties:
relocation: { const: true }
derivation:
# Use a register to persist the last-arrived station for each bike.
register:
schema: ride.schema.yaml#/$defs/terminus
initial:
# Value to use if this is the first time seeing this bike.
station: { id: 0, name: "" }
timestamp: "0000-00-00 00:00:00.0"
transform:
fromRides:
source: { name: examples/citi-bike/rides }
shuffle: [/bike_id]
update:
nodeJS: return [source.end];
publish:
# Compare |previous| register value from before the update lambda was applied,
# with the source document to determine if the bike mysteriously moved.
nodeJS: |
if (previous.station.id != 0 && previous.station.id != source.begin.station.id) {
return [
{ bike_id: source.bike_id, begin: previous, end: source.begin, relocation: true },
source,
];
}
return [source];
Use gazctl
to observe relocation events, as they’re derived:
$ gazctl journals read --block -l estuary.dev/collection=examples/citi-bike/rides-and-relocations \
| jq -c '. | select(.relocation)'
Catalog Tests¶
We can uses catalog tests to verify the end-to-end, integrated behavior of collections. In fact, all of the collections in this tutorial have associated tests, but they’re omitted here for brevity. You can find examples of comprehensive tests on GitHub.
Here’s an example of a test for the rides & relocations derivation we just built:
import:
- rides-and-relocations.flow.yaml
tests:
"Expect a sequence of connected rides don't produce a relocation event":
- ingest:
collection: examples/citi-bike/rides
documents:
# Bike goes from station 1 => 2 => 3 => 4.
- &ride1
bike_id: &bike 17558
begin: &station1
station: { id: 3276, name: "Marin Light Rail" }
timestamp: "2020-09-01 09:21:12.3090"
end: &station2
station: { id: 3639, name: "Harborside" }
timestamp: "2020-09-01 13:48:12.3830"
- &ride2
bike_id: *bike
begin: *station2
end: &station3
station: { id: 3202, name: "Newport PATH" }
timestamp: "2020-09-01 14:33:35.1020"
- &ride3
bike_id: *bike
begin: *station3
end: &station4
station: { id: 3267, name: "Morris Canal" }
timestamp: "2020-09-01 16:49:30.1610"
- verify:
collection: examples/citi-bike/rides-and-relocations
documents: [*ride1, *ride2, *ride3]
"Expect a disconnected ride sequence produces an interleaved relocation":
- ingest:
collection: examples/citi-bike/rides
documents:
- &ride1 { bike_id: *bike, begin: *station1, end: *station2 }
- &ride2 { bike_id: *bike, begin: *station3, end: *station4 }
- verify:
collection: examples/citi-bike/rides-and-relocations
documents:
- *ride1
- {
bike_id: *bike,
begin: *station2,
end: *station3,
relocation: true,
}
- *ride2
Station Status¶
Suppose we’re building a station status API. We’re bringing together some basic statistics about each station, like the number of bikes which have arrived, departed, and been relocated in or out. We also need to know which bikes are currently at each station.
To accomplish this, we’ll build a collection keyed on station IDs into which we’ll derive
documents that update our station status. However, we need to tell Flow how to reduce
these updates into a full view of a station’s status, by adding reduce
annotations
into our schema. Here’s the complete schema for our station status collection:
# Compose in the "station" definition from ride.schema.yaml,
# which defines "id", "name", and "geo".
$ref: ride.schema.yaml#/$defs/station
properties:
arrival:
description: "Statistics on Bike arrivals to the station"
properties:
ride:
title: "Bikes ridden to the station"
type: integer
reduce: { strategy: sum }
move:
title: "Bikes moved to the station"
type: integer
reduce: { strategy: sum }
type: object
reduce: { strategy: merge }
departure:
description: "Statistics on Bike departures from the station"
properties:
ride:
title: "Bikes ridden from the station"
type: integer
reduce: { strategy: sum }
move:
title: "Bikes moved from the station"
type: integer
reduce: { strategy: sum }
type: object
reduce: { strategy: merge }
stable:
description: "Set of Bike IDs which are currently at this station"
type: object
reduce:
strategy: set
# Use bike IDs as their own keys.
key: [""]
# Sets are composed of 'add', 'intersect', and 'remove' components.
# Here, we're representing the set as an array of integer bike IDs.
additionalProperties:
type: array
items: { type: integer }
reduce: { strategy: merge }
reduce: { strategy: merge }
Flow uses reduce annotations to build general “combiners” (in the map/reduce sense) over documents of a given schema. Those combiners are employed automatically by Flow.
Now we define our derivation. Since Flow is handling reductions for us, our remaining responsibility is to implement the “mapper” function which will transform source events into status status updates:
import:
- rides-and-relocations.flow.yaml
collections:
- name: examples/citi-bike/stations
key: [/id]
schema: station.schema.yaml
projections:
stable: /stable/add
derivation:
transform:
ridesAndMoves:
source:
name: examples/citi-bike/rides-and-relocations
publish:
nodeJS: |
if (source.relocation) {
return [
{
departure: { move: 1 },
stable: { remove: [source.bike_id] },
...source.begin.station,
},
{
arrival: { move: 1 },
stable: { add: [source.bike_id] },
...source.end.station,
},
];
} else {
return [
{
departure: { ride: 1 },
stable: { remove: [source.bike_id] },
...source.begin.station,
},
{
arrival: { ride: 1 },
stable: { add: [source.bike_id] },
...source.end.station,
},
];
}
Now we can materialize the collection into a PostgreSQL table, and have a live view into stations:
$ flowctl materialize --collection examples/citi-bike/stations --table-name stations --target testDB
-- Current bikes at each station.
select id, name, stable from stations order by name asc limit 10;
-- Station arrivals and departures.
select id, name, "arrival/ride", "departure/ride", "arrival/move", "departure/move"
from stations order by name asc limit 10;
Idle Bikes¶
We’re next tasked with identifying when bikes have sat idle at a station for an extended period of time. This is a potential signal that something is wrong with the bike, and customers are avoiding it.
Event-driven systems usually aren’t terribly good at detecting when things haven’t happened. At this point, an engineer will often reach for a task scheduler like Airflow, and set up a job that takes periodic snapshots of bike locations, and compares them to find ones which haven’t changed.
Flow offers a simpler approach, which is to join the rides collection with itself, using a read delay:
import:
- rides.flow.yaml
collections:
# Derive idle bikes via two transforms of rides:
# * One reads in real-time, and stores the ride timestamp in a register.
# * Two reads with a delay, and checks whether the last
# ride timestamp hasn't updated since this (delayed) ride.
- name: examples/citi-bike/idle-bikes
schema:
type: object
properties:
bike_id: { type: integer }
station: { $ref: ride.schema.yaml#/$defs/terminus }
required: [bike_id, station]
key: [/bike_id, /station/timestamp]
derivation:
register:
# Store the most-recent ride timestamp for each bike_id,
# and default to null if the bike hasn't ridden before.
schema: { type: [string, "null"] }
initial: null
transform:
liveRides:
source:
name: examples/citi-bike/rides
shuffle: [/bike_id]
update:
nodeJS: return [source.end.timestamp];
delayedRides:
source:
name: examples/citi-bike/rides
shuffle: [/bike_id]
# Use a 2-day read delay, relative to the document's ingestion.
# To see read delays in action within a short-lived
# testing contexts, try using a smaller value (e.g., 2m).
readDelay: "48h"
publish:
nodeJS: |
// Publish if the bike hasn't moved in 2 days.
if (register === source.end.timestamp) {
return [{ bike_id: source.bike_id, station: source.end }];
}
return [];
After the read delay has elapsed, we’ll start to see events in the “idle-bikes” collection:
$ gazctl journals read --block -l estuary.dev/collection=examples/citi-bike/idle-bikes