Wikipedia Edits

We’ll use to model Wikipedia page edits data, inspired by the Druid documentation.

Captured Edits

Our source dataset has documents like:

{
   "time": "2015-09-12T22:02:05.807Z",
   "channel": "#en.wikipedia",
   "cityName": "New York",
   "comment": "/* Life and career */",
   "countryIsoCode": "US",
   "countryName": "United States",
   "isAnonymous": true,
   "isMinor": false,
   "isNew": false,
   "isRobot": false,
   "isUnpatrolled": false,
   "metroCode": 501,
   "namespace": "Main",
   "page": "Louis Gruenberg",
   "regionIsoCode": "NY",
   "regionName": "New York",
   "user": "68.175.31.28",
   "delta": 178,
   "added": 178,
   "deleted": 0
}

Here’s a captured collection for these page edits:

collections:
  - name: examples/wiki/edits
    key: [/time, /page]
    # Inline schema which partially describes the edit dataset:
    schema:
      type: object
      required: [time, page, channel]
      properties:
        time: { type: string }
        page: { type: string }
        channel: { type: string }
        countryIsoCode: { type: [string, "null"] }
        added: { type: integer }
        deleted: { type: integer }
    # Declare channel (e.x. "#en.wikipedia") as a logical partition:
    projections:
      channel:
        location: /channel
        partition: true

We’ll use Flow’s ingestion API to capture the collection. In a production setting, you could imagine the collection instead being bound to a pub/sub topic or S3 bucket & path:

# Start a local development instance, and leave it running:
$ flowctl develop

# In another terminal:
$ examples/wiki/load-pages.sh

Page Roll-up

We can roll-up on page to understand edit statistics for each one, including a by-country break down (where the country is known):

import:
  - edits.flow.yaml

collections:
  - name: examples/wiki/pages
    key: [/page]
    # Inline schema which rolls up page edit statistics,
    # including a per-country breakdown:
    schema:
      $defs:
        counter:
          type: integer
          reduce: { strategy: sum }

        stats:
          type: object
          reduce: { strategy: merge }
          properties:
            cnt: { $ref: "#/$defs/counter" }
            add: { $ref: "#/$defs/counter" }
            del: { $ref: "#/$defs/counter" }

      type: object
      $ref: "#/$defs/stats"
      properties:
        page: { type: string }
        byCountry:
          type: object
          reduce: { strategy: merge }
          additionalProperties: { $ref: "#/$defs/stats" }
      required: [page]

    # /byCountry is an object (which isn't projected by default),
    # and we'd like to materialize it to a column.
    projections:
      byCountry: /byCountry

    derivation:
      transform:
        rollUpEdits:
          source:
            name: examples/wiki/edits
          publish:
            nodeJS: |
              let stats = {cnt: 1, add: source.added, del: source.deleted};

              if (source.countryIsoCode) {
                return [{
                    page: source.page,
                    byCountry: {[source.countryIsoCode]: stats},
                    ...stats,
                }];
              }
              // Unknown country.
              return [{page: source.page, ...stats}];

Materialize pages to a test database:

$ flowctl materialize --collection examples/wiki/pages --table-name pages --target testDB

Query for popular pages. As page edits are captured into the source collection, the page roll-up derivation and it’s materialization will update. You can repeat the ingest if it completes too quickly:

SELECT page, cnt, add, del, "byCountry" FROM pages WHERE cnt > 10;