Citi Bike System Data

We’ll be using Flow to capture and process Citi Bike system data. The dataset is available in the S3 “tripdata” bucket as compressed CSV files of each ride taken within the system, by month.

Modeling Rides

Every Flow collection has an associated JSON Schema which describes its documents. Let’s begin with an example of a ride document that we want to schematize:

{
   "bike_id": 26396,
   "duration_seconds": 1193,
   "user_type": "Customer",
   "gender": 0,
   "birth_year": 1969,
   "begin": {
      "station": {
         "geo": {
         "latitude": 40.711863,
         "longitude": -73.944024
         },
         "id": 3081,
         "name": "Graham Ave & Grand St"
      },
      "timestamp": "2020-09-01 00:00:12.2020"
   },
   "end": {
      "station": {
         "geo": {
         "latitude": 40.68402,
         "longitude": -73.94977
         },
         "id": 3048,
         "name": "Putnam Ave & Nostrand Ave"
      },
      "timestamp": "2020-09-01 00:20:05.5470"
   }
}

Ride Schema

We’ve already gone to the trouble of creating a JSON Schema which models Citi Bike rides, which you can find here. We can remotely link to it from our catalog so we don’t have to repeat ourselves. A few things about it to point out:

It defines the shape that documents can take.

A “ride” document must have a bike_id, begin, and end. A “location” must have a latitude, longitude, and so on. The $ref keyword makes it easy to re-use common structures.

Validations constrain the types and values that documents can take.

A “longitude” must be a number and fall within the expected range, and “gender” must be a value within the expected enumeration. Some properties are required, while others are optional. Flow enforces that all documents of a collection must validate against its schema before they can be added.

Flow is also able to translate many schema constraints (e.g. “/begin/station/id must exist and be an integer”) into other kinds of schema – like TypeScript types and SQL constraints – which promotes end-to-end type safety and a better development experience.

Annotations attach information to locations within the document.

title and description keywords give color to locations of the document. They’re machine-accessible documentation – which makes it possible to re-use these annotations in transformed versions of the schema.

Capturing Rides

To work with ride events, first we need to define a collection into which we’ll ingest them. Simple enough, but a wrinkle is that the source dataset is CSV files, using header names which don’t match our schema:

$ wget https://s3.amazonaws.com/tripdata/202009-citibike-tripdata.csv.zip
$ unzip -p 202009-citibike-tripdata.csv.zip  | head -5
"tripduration","starttime","stoptime","start station id","start station name","start station latitude","start station longitude","end station id","end station name","end station latitude","end station longitude","bikeid","usertype","birth year","gender"
4225,"2020-09-01 00:00:01.0430","2020-09-01 01:10:26.6350",3508,"St Nicholas Ave & Manhattan Ave",40.809725,-73.953149,116,"W 17 St & 8 Ave",40.74177603,-74.00149746,44317,"Customer",1979,1
1868,"2020-09-01 00:00:04.8320","2020-09-01 00:31:13.7650",3621,"27 Ave & 9 St",40.7739825,-73.9309134,3094,"Graham Ave & Withers St",40.7169811,-73.94485918,37793,"Customer",1991,1
1097,"2020-09-01 00:00:06.8990","2020-09-01 00:18:24.2260",3492,"E 118 St & Park Ave",40.8005385,-73.9419949,3959,"Edgecombe Ave & W 145 St",40.823498,-73.94386,41438,"Subscriber",1984,1
1473,"2020-09-01 00:00:07.7440","2020-09-01 00:24:41.1800",3946,"St Nicholas Ave & W 137 St",40.818477,-73.947568,4002,"W 144 St & Adam Clayton Powell Blvd",40.820877,-73.939249,35860,"Customer",1990,2

Projections let us account for this, by defining a mapping between document locations (as JSON Pointers) and corresponding fields in a flattened, table-based representation such as a CSV file or SQL table. They’re used whenever Flow is capturing from or materializing into table-systems.

Putting it all together, let’s define a captured “rides” collection:

collections:
  - name: examples/citi-bike/rides
    key: [/bike_id, /begin/timestamp]
    schema: https://raw.githubusercontent.com/estuary/docs/developer-docs/examples/citi-bike/ride.schema.yaml
    # Define projections for each CSV header name used in the source dataset.
    projections:
      bikeid: /bike_id
      birth year: /birth_year
      end station id: /end/station/id
      end station latitude: /end/station/geo/latitude
      end station longitude: /end/station/geo/longitude
      end station name: /end/station/name
      gender: /gender
      start station id: /begin/station/id
      start station latitude: /begin/station/geo/latitude
      start station longitude: /begin/station/geo/longitude
      start station name: /begin/station/name
      starttime: /begin/timestamp
      stoptime: /end/timestamp
      tripduration: /duration_seconds
      usertype: /user_type

As this is a tutorial, we’ll use a Flow ingestion API to capture directly from CSV. In a real-world setting, you could instead bind the collection to a pub/sub topic, S3 bucket and path, or a database table (via change data capture):

# Start a local development instance, and leave it running:
$ flowctl develop

# In another terminal:
$ examples/citi-bike/load-rides.sh

Last-Seen Station of a Bike

We’ll declare and test a collection that derives, for each bike, the station it last arrived at:

import:
  - rides.flow.yaml

collections:
  - name: examples/citi-bike/last-seen
    key: [/bike_id]
    schema:
      type: object
      properties:
        bike_id: { $ref: ride.schema.yaml#/properties/bike_id }
        last: { $ref: ride.schema.yaml#/$defs/terminus }
      required: [bike_id, last]

    derivation:
      transform:
        locationFromRide:
          source: { name: examples/citi-bike/rides }
          publish:
            nodeJS: |
              return [{ bike_id: source.bike_id, last: source.end }];

We can materialize the collection into a database table:

$ flowctl materialize --collection examples/citi-bike/last-seen --table-name last_seen --target testDB

If you’re in VSCode, you can query the attached database using the “SQLTools” icon on the left bar. Or, use psql:

select bike_id, "last/station/name", "last/timestamp" from last_seen limit 10;

Materialization tables always use the collection’s key, and are update continuously to reflect ongoing changes of the collection.

Bike Relocations

Citi Bike will sometimes redistribute bikes between stations, when a station gets too full or empty. These relocations show up as “holes” in the ride data, where a bike mysteriously ends a ride at one station and starts its next ride at a different station.

Suppose we want a collection which is enriched with explicit “relocation” events. To derive it, we must determine if the start of a current ride is different than the end of a previous ride, for each bike. But, we don’t have the prior ending station station available in the source document.

We can use registers to preserve a previous ending station, and compare it with a current starting station for each bike:

import:
  - rides.flow.yaml

collections:
  - name: examples/citi-bike/rides-and-relocations
    key: [/bike_id, /begin/timestamp]
    schema:
      # Relocations are rides marked by a "relocation: true" property.
      $ref: ride.schema.yaml
      properties:
        relocation: { const: true }

    derivation:
      # Use a register to persist the last-arrived station for each bike.
      register:
        schema: ride.schema.yaml#/$defs/terminus
        initial:
          # Value to use if this is the first time seeing this bike.
          station: { id: 0, name: "" }
          timestamp: "0000-00-00 00:00:00.0"

      transform:
        fromRides:
          source: { name: examples/citi-bike/rides }
          shuffle: [/bike_id]
          update:
            nodeJS: return [source.end];
          publish:
            # Compare |previous| register value from before the update lambda was applied,
            # with the source document to determine if the bike mysteriously moved.
            nodeJS: |
              if (previous.station.id != 0 && previous.station.id != source.begin.station.id) {
                return [
                  { bike_id: source.bike_id, begin: previous, end: source.begin, relocation: true },
                  source,
                ];
              }
              return [source];

Use gazctl to observe relocation events, as they’re derived:

$ gazctl journals read --block -l estuary.dev/collection=examples/citi-bike/rides-and-relocations \
   | jq -c '. | select(.relocation)'

Catalog Tests

We can uses catalog tests to verify the end-to-end, integrated behavior of collections. In fact, all of the collections in this tutorial have associated tests, but they’re omitted here for brevity. You can find examples of comprehensive tests on GitHub.

Here’s an example of a test for the rides & relocations derivation we just built:

import:
  - rides-and-relocations.flow.yaml

tests:
  "Expect a sequence of connected rides don't produce a relocation event":
    - ingest:
        collection: examples/citi-bike/rides
        documents:
          # Bike goes from station 1 => 2 => 3 => 4.
          - &ride1
            bike_id: &bike 17558
            begin: &station1
              station: { id: 3276, name: "Marin Light Rail" }
              timestamp: "2020-09-01 09:21:12.3090"
            end: &station2
              station: { id: 3639, name: "Harborside" }
              timestamp: "2020-09-01 13:48:12.3830"
          - &ride2
            bike_id: *bike
            begin: *station2
            end: &station3
              station: { id: 3202, name: "Newport PATH" }
              timestamp: "2020-09-01 14:33:35.1020"
          - &ride3
            bike_id: *bike
            begin: *station3
            end: &station4
              station: { id: 3267, name: "Morris Canal" }
              timestamp: "2020-09-01 16:49:30.1610"
    - verify:
        collection: examples/citi-bike/rides-and-relocations
        documents: [*ride1, *ride2, *ride3]

  "Expect a disconnected ride sequence produces an interleaved relocation":
    - ingest:
        collection: examples/citi-bike/rides
        documents:
          - &ride1 { bike_id: *bike, begin: *station1, end: *station2 }
          - &ride2 { bike_id: *bike, begin: *station3, end: *station4 }
    - verify:
        collection: examples/citi-bike/rides-and-relocations
        documents:
          - *ride1
          - {
              bike_id: *bike,
              begin: *station2,
              end: *station3,
              relocation: true,
            }
          - *ride2

Station Status

Suppose we’re building a station status API. We’re bringing together some basic statistics about each station, like the number of bikes which have arrived, departed, and been relocated in or out. We also need to know which bikes are currently at each station.

To accomplish this, we’ll build a collection keyed on station IDs into which we’ll derive documents that update our station status. However, we need to tell Flow how to reduce these updates into a full view of a station’s status, by adding reduce annotations into our schema. Here’s the complete schema for our station status collection:

# Compose in the "station" definition from ride.schema.yaml,
# which defines "id", "name", and "geo".
$ref: ride.schema.yaml#/$defs/station

properties:
  arrival:
    description: "Statistics on Bike arrivals to the station"
    properties:
      ride:
        title: "Bikes ridden to the station"
        type: integer
        reduce: { strategy: sum }
      move:
        title: "Bikes moved to the station"
        type: integer
        reduce: { strategy: sum }

    type: object
    reduce: { strategy: merge }

  departure:
    description: "Statistics on Bike departures from the station"
    properties:
      ride:
        title: "Bikes ridden from the station"
        type: integer
        reduce: { strategy: sum }
      move:
        title: "Bikes moved from the station"
        type: integer
        reduce: { strategy: sum }

    type: object
    reduce: { strategy: merge }

  stable:
    description: "Set of Bike IDs which are currently at this station"
    type: object

    reduce:
      strategy: set
      # Use bike IDs as their own keys.
      key: [""]

    # Sets are composed of 'add', 'intersect', and 'remove' components.
    # Here, we're representing the set as an array of integer bike IDs.
    additionalProperties:
      type: array
      items: { type: integer }
      reduce: { strategy: merge }

reduce: { strategy: merge }

Flow uses reduce annotations to build general “combiners” (in the map/reduce sense) over documents of a given schema. Those combiners are employed automatically by Flow.

Now we define our derivation. Since Flow is handling reductions for us, our remaining responsibility is to implement the “mapper” function which will transform source events into status status updates:

import:
  - rides-and-relocations.flow.yaml

collections:
  - name: examples/citi-bike/stations
    key: [/id]
    schema: station.schema.yaml
    projections:
      stable: /stable/add

    derivation:
      transform:
        ridesAndMoves:
          source:
            name: examples/citi-bike/rides-and-relocations
          publish:
            nodeJS: |
              if (source.relocation) {
                return [
                  {
                    departure: { move: 1 },
                    stable: { remove: [source.bike_id] },
                    ...source.begin.station,
                  },
                  {
                    arrival: { move: 1 },
                    stable: { add: [source.bike_id] },
                    ...source.end.station,
                  },
                ];
              } else {
                return [
                  {
                    departure: { ride: 1 },
                    stable: { remove: [source.bike_id] },
                    ...source.begin.station,
                  },
                  {
                    arrival: { ride: 1 },
                    stable: { add: [source.bike_id] },
                    ...source.end.station,
                  },
                ];
              }

Now we can materialize the collection into a PostgreSQL table, and have a live view into stations:

$ flowctl materialize --collection examples/citi-bike/stations --table-name stations --target testDB
-- Current bikes at each station.
select id, name, stable from stations order by name asc limit 10;
-- Station arrivals and departures.
select id, name, "arrival/ride", "departure/ride", "arrival/move", "departure/move"
   from stations order by name asc limit 10;

Idle Bikes

We’re next tasked with identifying when bikes have sat idle at a station for an extended period of time. This is a potential signal that something is wrong with the bike, and customers are avoiding it.

Event-driven systems usually aren’t terribly good at detecting when things haven’t happened. At this point, an engineer will often reach for a task scheduler like Airflow, and set up a job that takes periodic snapshots of bike locations, and compares them to find ones which haven’t changed.

Flow offers a simpler approach, which is to join the rides collection with itself, using a read delay:

import:
  - rides.flow.yaml

collections:
  # Derive idle bikes via two transforms of rides:
  # * One reads in real-time, and stores the ride timestamp in a register.
  # * Two reads with a delay, and checks whether the last
  #   ride timestamp hasn't updated since this (delayed) ride.
  - name: examples/citi-bike/idle-bikes
    schema:
      type: object
      properties:
        bike_id: { type: integer }
        station: { $ref: ride.schema.yaml#/$defs/terminus }
      required: [bike_id, station]

    key: [/bike_id, /station/timestamp]

    derivation:
      register:
        # Store the most-recent ride timestamp for each bike_id,
        # and default to null if the bike hasn't ridden before.
        schema: { type: [string, "null"] }
        initial: null

      transform:
        liveRides:
          source:
            name: examples/citi-bike/rides
          shuffle: [/bike_id]
          update:
            nodeJS: return [source.end.timestamp];

        delayedRides:
          source:
            name: examples/citi-bike/rides
          shuffle: [/bike_id]
          # Use a 2-day read delay, relative to the document's ingestion.
          # To see read delays in action within a short-lived
          # testing contexts, try using a smaller value (e.g., 2m).
          readDelay: "48h"
          publish:
            nodeJS: |
              // Publish if the bike hasn't moved in 2 days.
              if (register === source.end.timestamp) {
                return [{ bike_id: source.bike_id, station: source.end }];
              }
              return [];

After the read delay has elapsed, we’ll start to see events in the “idle-bikes” collection:

$ gazctl journals read --block -l estuary.dev/collection=examples/citi-bike/idle-bikes