Reductions

Flow implements a number of reduction strategies for use within schemas, which tell Flow how two instances of a document can be meaningfully combined together.

Guarantees

In Flow, documents having the same collection key and written to the same logical partition have a “total order”, meaning that one document is universally understood to have been written before the other.

This doesn’t hold for documents of the same key written to different logical partitions. These documents can be considered “mostly” ordered: Flow uses timestamps to understand the relative ordering of these documents, and while this largely does the “Right Thing”, small amounts of re-ordering are possible and even likely.

Flow guarantees exactly-once semantics within derived collections and materializations (so long as the target system supports transactions), and a document reduction will be applied exactly one time.

Flow does not guarantee that documents are reduced in sequential order, directly into a “base” document. For example, documents of a single Flow ingest transaction are combined together into one document per collection key at ingestion time – and that document may be again combined with still others, and so on until a final reduction into the base document occurs.

Taken together, these “total order” and “exactly-once” guarantees mean that reduction strategies must be associative [e.g. (2 + 3) + 4 = 2 + (3 + 4) ], but need not be commutative [ 2 + 3 = 3 + 2 ] or idempotent [ S u S = S ]. They expand the palette of strategies which can be implemented, and allow for more efficient implementations as compared to, e.g., CRDTs.

In documentation, we’ll refer to the “left-hand side” (LHS) as the preceding document, and the “right-hand side” (RHS) as the following one. Keep in mind that both the LHS and RHS may themselves represent a combination of still more ordered documents (e.g, reductions are applied associatively).

Note

Estuary has many future plans for reduction annotations:

  • More strategies, including data sketches like HyperLogLogs, T-Digests, etc.

  • Eviction policies and constraints, for bounding the sizes of objects and arrays with fine-grained removal ordering.

What’s here today could be considered a minimal, useful proof-of-concept.

append

append works with arrays, and extends the left-hand array with items of the right-hand side.

collections:
  - name: example/reductions/append
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        value:
          # Append only works with type "array".
          # Others will error at build time.
          type: array
          reduce: { strategy: append }
      required: [key]
    key: [/key]

tests:
  "Expect we can append arrays":
    - ingest:
        collection: example/reductions/append
        documents:
          - { key: "key", value: [1, 2] }
          - { key: "key", value: [3, null, "abc"] }
    - verify:
        collection: example/reductions/append
        documents:
          - { key: "key", value: [1, 2, 3, null, "abc"] }

The right-hand side must always be an array. The left-hand side may be null, in which case the reduction is treated as a no-op and its result remains null. This can be combined with schema conditionals to “toggle” whether reduction reduction should be done or not.

firstWriteWins / lastWriteWins

firstWriteWins always takes the first value seen at the annotated location. Likewise lastWriteWins always takes the last. Schemas which don’t have an explicit reduce annotation default to lastWriteWins behavior.

collections:
  - name: example/reductions/fww-lww
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        fww: { reduce: { strategy: firstWriteWins } }
        lww: { reduce: { strategy: lastWriteWins } }
      required: [key]
    key: [/key]

tests:
  "Expect we can track first- and list-written values":
    - ingest:
        collection: example/reductions/fww-lww
        documents:
          - { key: "key", fww: "one", lww: "one" }
          - { key: "key", fww: "two", lww: "two" }
    - verify:
        collection: example/reductions/fww-lww
        documents:
          - { key: "key", fww: "one", lww: "two" }

merge

merge reduces the LHS and RHS by recursively reducing shared document locations. The LHS and RHS must either both be objects, or both be arrays.

If both sides are objects then it performs a deep merge of each property. If LHS and RHS are both arrays then items at each index of both sides are merged together, extending the shorter of the two sides by taking items of the longer:

collections:
  - name: example/reductions/merge
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        value:
          # Merge only works with types "array" or "object".
          # Others will error at build time.
          type: [array, object]
          reduce: { strategy: merge }
          # Deeply merge sub-locations (items or properties) by summing them.
          items:
            type: number
            reduce: { strategy: sum }
          additionalProperties:
            type: number
            reduce: { strategy: sum }
      required: [key]
    key: [/key]

tests:
  "Expect we can merge arrays by index":
    - ingest:
        collection: example/reductions/merge
        documents:
          - { key: "key", value: [1, 1] }
          - { key: "key", value: [2, 2, 2] }
    - verify:
        collection: example/reductions/merge
        documents:
          - { key: "key", value: [3, 3, 2] }

  "Expect we can merge objects by property":
    - ingest:
        collection: example/reductions/merge
        documents:
          - { key: "key", value: { "a": 1, "b": 1 } }
          - { key: "key", value: { "a": 1, "c": 1 } }
    - verify:
        collection: example/reductions/merge
        documents:
          - { key: "key", value: { "a": 2, "b": 1, "c": 1 } }

Merge may also take a key, which is one or more JSON pointers that are relative to the reduced location. If both sides are arrays and a merge key is present, then a deep sorted merge of the respective items is done, as ordered by the key. Arrays must be pre-sorted and de-duplicated by the key, and merge itself always maintains this invariant.

Note that a key of [“”] can be used for natural item ordering, e.g. when merging sorted arrays of scalars.

collections:
  - name: example/reductions/merge-key
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        value:
          type: array
          reduce:
            strategy: merge
            key: [/k]
          items: { reduce: { strategy: firstWriteWins } }
      required: [key]
    key: [/key]

tests:
  "Expect we can merge sorted arrays":
    - ingest:
        collection: example/reductions/merge-key
        documents:
          - { key: "key", value: [{ k: "a", v: 1 }, { k: "b", v: 1 }] }
          - { key: "key", value: [{ k: "a", v: 2 }, { k: "c", v: 2 }] }
    - verify:
        collection: example/reductions/merge-key
        documents:
          - {
              key: "key",
              value: [{ k: "a", v: 1 }, { k: "b", v: 1 }, { k: "c", v: 2 }],
            }

As with append, the left-hand side of merge may be null, in which case the reduction is treated as a no-op and its result remains null.

minimize / maximize

minimize and maximize reduce by taking the smallest (or largest) seen value.

collections:
  - name: example/reductions/min-max
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        min: { reduce: { strategy: minimize } }
        max: { reduce: { strategy: maximize } }
      required: [key]
    key: [/key]

tests:
  "Expect we can min/max values":
    - ingest:
        collection: example/reductions/min-max
        documents:
          - { key: "key", min: 32, max: "abc" }
          - { key: "key", min: 42, max: "def" }
    - verify:
        collection: example/reductions/min-max
        documents:
          - { key: "key", min: 32, max: "def" }

Minimize and maximize can also take a key, which is one or more JSON pointers that are relative to the reduced location. Keys make it possible to min/max over complex types, by ordering over an extracted composite key.

In the event that a RHS document key equals the current LHS minimum (or maximum), then documents are deeply merged. This can be used to, for example, track not just the minimum value but also the number of times it’s been seen:

collections:
  - name: example/reductions/min-max-key
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        min:
          $anchor: min-max-value
          type: array
          items:
            - type: string
            - type: number
              reduce: { strategy: sum }
          reduce:
            strategy: minimize
            key: [/0]
        max:
          $ref: "#min-max-value"
          reduce:
            strategy: maximize
            key: [/0]
      required: [key]
    key: [/key]

tests:
  "Expect we can min/max values using a key extractor":
    - ingest:
        collection: example/reductions/min-max-key
        documents:
          - { key: "key", min: ["a", 1], max: ["a", 1] }
          - { key: "key", min: ["c", 2], max: ["c", 2] }
          - { key: "key", min: ["b", 3], max: ["b", 3] }
          - { key: "key", min: ["a", 4], max: ["a", 4] }
    - verify:
        collection: example/reductions/min-max-key
        documents:
          # Min of equal keys ["a", 1] and ["a", 4] => ["a", 5].
          - { key: "key", min: ["a", 5], max: ["c", 2] }

set

set interprets the document location as an update to a set.

The location must be an object having only “add”, “intersect”, and “remove” properties. Any single “add”, “intersect”, or “remove” is always allowed.

A document with “intersect” and “add” is allowed, and is interpreted as applying the intersection to the LHS set, followed by a union with the additions.

A document with “remove” and “add” is also allowed, and is interpreted as applying the removals to the base set, followed by a union with the additions.

“remove” and “intersect” within the same document is prohibited.

Set additions are deeply merged. This makes sets behave like associative maps, where the “value” of a set member can be updated by adding it to set again, with a reducible update.

Sets may be objects, in which case the object property serves as the set item key:

collections:
  - name: example/reductions/set
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        value:
          # Sets are always represented as an object.
          type: object
          reduce: { strategy: set }
          # Schema for "add", "intersect", & "remove" properties
          # (each a map of keys and their associated sums):
          additionalProperties:
            type: object
            additionalProperties:
              type: number
              reduce: { strategy: sum }
            # Flow requires that all parents of locations with a reduce
            # annotation also have one themselves.
            # This strategy therefore must (currently) be here, but is ignored.
            reduce: { strategy: lastWriteWins }

      required: [key]
    key: [/key]

tests:
  "Expect we can apply set operations to incrementally build associative maps":
    - ingest:
        collection: example/reductions/set
        documents:
          - { key: "key", value: { "add": { "a": 1, "b": 1, "c": 1 } } }
          - { key: "key", value: { "remove": { "b": 0 } } }
          - { key: "key", value: { "add": { "a": 1, "d": 1 } } }
    - verify:
        collection: example/reductions/set
        documents:
          - { key: "key", value: { "add": { "a": 2, "c": 1, "d": 1 } } }
    - ingest:
        collection: example/reductions/set
        documents:
          - { key: "key", value: { "intersect": { "a": 0, "d": 0 } } }
          - { key: "key", value: { "add": { "a": 1, "e": 1 } } }
    - verify:
        collection: example/reductions/set
        documents:
          - { key: "key", value: { "add": { "a": 3, "d": 1, "e": 1 } } }

Sets can also be sorted arrays, which are ordered using a provide key extractor. Keys are given as one or more JSON pointers, each relative to the item. As with “merge”, arrays must be pre-sorted and de-duplicated by the key, and set reductions always maintain this invariant

Use a key extractor of [“”] to apply the natural ordering of scalar values.

Whether array or object types are used, the type must always be consistent across the “add” / “intersect” / “remove” terms of both sides of the reduction.

collections:
  - name: example/reductions/set-array
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        value:
          # Sets are always represented as an object.
          type: object
          reduce:
            strategy: set
            key: [/0]
          # Schema for "add", "intersect", & "remove" properties
          # (each a sorted array of [key, sum] 2-tuples):
          additionalProperties:
            type: array
            # Flow requires that all parents of locations with a reduce
            # annotation also have one themselves.
            # This strategy therefore must (currently) be here, but is ignored.
            reduce: { strategy: lastWriteWins }
            # Schema for contained [key, sum] 2-tuples:
            items:
              type: array
              items:
                - type: string
                - type: number
                  reduce: { strategy: sum }
              reduce: { strategy: merge }

      required: [key]
    key: [/key]

tests:
  ? "Expect we can apply operations of sorted-array sets to incrementally build associative maps"
  : - ingest:
        collection: example/reductions/set-array
        documents:
          - { key: "key", value: { "add": [["a", 1], ["b", 1], ["c", 1]] } }
          - { key: "key", value: { "remove": [["b", 0]] } }
          - { key: "key", value: { "add": [["a", 1], ["d", 1]] } }
    - verify:
        collection: example/reductions/set-array
        documents:
          - { key: "key", value: { "add": [["a", 2], ["c", 1], ["d", 1]] } }
    - ingest:
        collection: example/reductions/set-array
        documents:
          - { key: "key", value: { "intersect": [["a", 0], ["d", 0]] } }
          - { key: "key", value: { "add": [["a", 1], ["e", 1]] } }
    - verify:
        collection: example/reductions/set-array
        documents:
          - { key: "key", value: { "add": [["a", 3], ["d", 1], ["e", 1]] } }

sum

sum reduces two numbers or integers by adding their values.

collections:
  - name: example/reductions/sum
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        key: { type: string }
        value:
          # Sum only works with types "number" or "integer".
          # Others will error at build time.
          type: number
          reduce: { strategy: sum }
      required: [key]
    key: [/key]

tests:
  "Expect we can sum two numbers":
    - ingest:
        collection: example/reductions/sum
        documents:
          - { key: "key", value: 5 }
          - { key: "key", value: -1.2 }
    - verify:
        collection: example/reductions/sum
        documents:
          - { key: "key", value: 3.8 }

Composing with Conditionals

Reduction strategies are JSON Schema annotations, and as such their applicability at a given document location can be controlled through the use of conditional keywords within the schema like oneOf or if/then/else. This means Flow’s built-in strategies below can be combined with schema conditionals to construct a wider variety of custom reduction behaviors.

For example, here’s a reset-able counter:

collections:
  - name: example/reductions/sum-reset
    schema:
      type: object
      properties:
        key: { type: string }
        value: { type: number }
      required: [key]
      # Use oneOf to express a tagged union over "action".
      oneOf:
        # When action = reset, reduce by taking this document.
        - properties: { action: { const: reset } }
          reduce: { strategy: lastWriteWins }
        # When action = sum, reduce by summing "value". Keep the LHS "action",
        # preserving a LHS "reset", so that resets are properly associative.
        - properties:
            action:
              const: sum
              reduce: { strategy: firstWriteWins }
            value: { reduce: { strategy: sum } }
          reduce: { strategy: merge }
    key: [/key]

tests:
  "Expect we can sum or reset numbers":
    - ingest:
        collection: example/reductions/sum-reset
        documents:
          - { key: "key", action: sum, value: 5 }
          - { key: "key", action: sum, value: -1.2 }
    - verify:
        collection: example/reductions/sum-reset
        documents:
          - { key: "key", value: 3.8 }
    - ingest:
        collection: example/reductions/sum-reset
        documents:
          - { key: "key", action: reset, value: 0 }
          - { key: "key", action: sum, value: 1.3 }
    - verify:
        collection: example/reductions/sum-reset
        documents:
          - { key: "key", value: 1.3 }