Wikipedia Edits¶
We’ll use to model Wikipedia page edits data, inspired by the Druid documentation.
Captured Edits¶
Our source dataset has documents like:
{
"time": "2015-09-12T22:02:05.807Z",
"channel": "#en.wikipedia",
"cityName": "New York",
"comment": "/* Life and career */",
"countryIsoCode": "US",
"countryName": "United States",
"isAnonymous": true,
"isMinor": false,
"isNew": false,
"isRobot": false,
"isUnpatrolled": false,
"metroCode": 501,
"namespace": "Main",
"page": "Louis Gruenberg",
"regionIsoCode": "NY",
"regionName": "New York",
"user": "68.175.31.28",
"delta": 178,
"added": 178,
"deleted": 0
}
Here’s a captured collection for these page edits:
collections:
- name: examples/wiki/edits
key: [/time, /page]
# Inline schema which partially describes the edit dataset:
schema:
type: object
required: [time, page, channel]
properties:
time: { type: string }
page: { type: string }
channel: { type: string }
countryIsoCode: { type: [string, "null"] }
added: { type: integer }
deleted: { type: integer }
# Declare channel (e.x. "#en.wikipedia") as a logical partition:
projections:
channel:
location: /channel
partition: true
We’ll use Flow’s ingestion API to capture the collection. In a production setting, you could imagine the collection instead being bound to a pub/sub topic or S3 bucket & path:
# Start a local development instance, and leave it running:
$ flowctl develop
# In another terminal:
$ examples/wiki/load-pages.sh
Page Roll-up¶
We can roll-up on page to understand edit statistics for each one, including a by-country break down (where the country is known):
import:
- edits.flow.yaml
collections:
- name: examples/wiki/pages
key: [/page]
# Inline schema which rolls up page edit statistics,
# including a per-country breakdown:
schema:
$defs:
counter:
type: integer
reduce: { strategy: sum }
stats:
type: object
reduce: { strategy: merge }
properties:
cnt: { $ref: "#/$defs/counter" }
add: { $ref: "#/$defs/counter" }
del: { $ref: "#/$defs/counter" }
type: object
$ref: "#/$defs/stats"
properties:
page: { type: string }
byCountry:
type: object
reduce: { strategy: merge }
additionalProperties: { $ref: "#/$defs/stats" }
required: [page]
# /byCountry is an object (which isn't projected by default),
# and we'd like to materialize it to a column.
projections:
byCountry: /byCountry
derivation:
transform:
rollUpEdits:
source:
name: examples/wiki/edits
publish:
nodeJS: |
let stats = {cnt: 1, add: source.added, del: source.deleted};
if (source.countryIsoCode) {
return [{
page: source.page,
byCountry: {[source.countryIsoCode]: stats},
...stats,
}];
}
// Unknown country.
return [{page: source.page, ...stats}];
Materialize pages to a test database:
$ flowctl materialize --collection examples/wiki/pages --table-name pages --target testDB
Query for popular pages. As page edits are captured into the source collection, the page roll-up derivation and it’s materialization will update. You can repeat the ingest if it completes too quickly:
SELECT page, cnt, add, del, "byCountry" FROM pages WHERE cnt > 10;