Shopping Carts

At this point, we have Captured collections setup for users and products, and we’re ready to start building the shopping carts. We’ll start by defining a Captured collection for cart updates, which will hold a record of each time a user adds or modifies a product in their cart. These cart updates will then be joined with the product information so that we’ll have the price of each item. Then we’ll create a “carts” collection that rolls up all the joined updates into a single document that includes all the items in a users cart, along with their prices.

Summing Item Quantities

Here’s the collection and schema for the cart updates:

cart-updates.flow.yaml
collections:
  - name: examples/shopping/cartUpdates
    schema: cart-update.schema.yaml
    key: [/userId, /productId]
cart-update.schema.yaml
type: object
description: Represents a request from a user to add or remove a product in their cart.
properties:
  userId: { type: integer }
  productId: { type: integer }
  quantity:
    description: The amount to adjust, which can be negative to remove items.
    type: integer
    reduce: { strategy: sum }
required: [userId, productId, quantity]
reduce: { strategy: merge }

Each cartUpdate document represents a request to add or subtract a quantity of a particular product in a user’s cart. If we get multiple updates for the same user and product, the quantities will be summed. This is because of the reduce annotation in the above schema.

Joining Cart Updates and Products

We’ll define a new derived collection that performs a streaming inner join of cartUpdate to product documents.

  - name: examples/shopping/cartUpdatesWithProducts
    key: [/action/userId, /product/id]
    schema:
      type: object
      properties:
        action: { $ref: cart-update.schema.yaml }
        product: { $ref: product.schema.yaml }
      required: [action, product]
      reduce: { strategy: lastWriteWins }

    derivation:
      register:
        initial: null
        schema:
          oneOf:
            - { $ref: product.schema.yaml }
            - { type: "null" }

      transform:
        products:
          source:
            name: examples/shopping/products
          update:
            nodeJS: |
              return [source]

        cartUpdates:
          source:
            name: examples/shopping/cartUpdates
          # Setting the shuffle key to "[/productId]" means that for each cartUpdate document from
          # the source, Flow will use the value of the productId field to look up its associated
          # register value.
          shuffle: [/productId]
          publish:
            nodeJS: |
              // The register schema says this might be null, so we need to deal with that here.
              // If we haven't seen a product with this id, then we simply don't publish. This makes
              // it an inner join.
              if (register) {
                // The ! in register! is a typescript non-null assertion. It's required since
                // the register schema says it may be null, and safe here because we checked.
                return [{action: source, product: register!}];
              }
              return [];

There’s two main concepts being used here. The first is the register. We’ll have a unique register value for each product id, since that’s the value we’re joining on. For each product id, the register value can either be a product document, or null, and the initial value is always null.

Now let’s look at the two transforms, starting with products. This will read documents from the products collection, and update the register for each one. Note that the default shuffle key is implicitly the key of the source collection, in this case the /id field of a product. The return value of [source] is simply adding the source product to the set of register values. We simply return the value(s) that we’d like to be saved in registers, rather than calling some sort of “save” function. The key for each value must be included in the document itself, and this is verified at build/compile time. We always return a single value here because we’re doing a 1-1 join.

Whenever a document is read from the cartUpdates collection, the cartUpdates transform will read the current value of the register and publish a new document that includes both the cartUpdate event and the product it joined to. If the register value is not null, then it means that the products update lambda has observed a product with this id, and we’ll emit a new cartUpdatesWithProducts document. This is what makes it an inner join, since we only return a document if the register is not null.

Rolling Up Carts

Now that we have the product information joined to each item, we’re ready to aggregate all of the joined documents into a single cart for each user. This is an excellent use case for the set reduction strategy. In this case, we’re going to apply the reduction annotations to the register schema, and leave the collection schema as lastWriteWins. This means that the state will accumulate in the register (one per userId), and the collection documents will each reflect the last known state.

Initially, all we’ll need is a single transform:

import:
  - cart-updates-with-products.flow.yaml
  - cart-purchase-requests.flow.yaml

collections:
  - name: examples/shopping/carts
    schema: cart.schema.yaml
    key: [/userId]
    derivation:
      register:
        initial: { userId: 0, cartItems: {} }
        schema:
          type: object
          properties:
            userId: { type: integer }
            cartItems:
              type: object
              reduce: { strategy: set, key: [/product/id] }
              additionalProperties:
                type: array
                items: { $ref: cart.schema.yaml#/$defs/cartItem }
          required: [userId, cartItems]
          reduce: { strategy: merge }

      transform:
        cartUpdatesWithProducts:
          source:
            name: examples/shopping/cartUpdatesWithProducts
          shuffle:
            - /action/userId
          update:
            nodeJS: |
              return [{
                userId: source.action.userId,
                cartItems: {
                  add: [source]
                }
              }]
          publish:
            nodeJS: |
              return [{
                userId: register.userId,
                items: register.cartItems!.add!,
              }]

In the update lambda, we’re adding the combined update-product document to the cartItems. The use of the set reduction strategy means that the item we provided will be deeply merged with the existing set. So if there’s already a product with the same id in the set, then the sum reduction strategy will apply.

Let’s take a look at a test case that demonstrates it working end to end. Here we’re ingesting some products followed by a series of cart updates. Then we verify the final cart.

cart-tests.flow.yaml
import:
  - carts.flow.yaml
  - cart-updates.flow.yaml
  - products.flow.yaml

tests:
  "shopping cart is populated from cartUpdates":
    - ingest:
        collection: examples/shopping/products
        documents:
          - { id: 333, name: "Fruit Rollup", price: 0.79 }
          - { id: 2222, name: "Fruit By The Foot", price: 0.89 }
          - { id: 4004, name: "Gushers", price: 2.95 }

    - ingest:
        collection: examples/shopping/cartUpdates
        documents:
          - userId: 1
            productId: 2222
            quantity: 2
          - userId: 1
            productId: 4004
            quantity: 1
          - userId: 1
            productId: 333
            quantity: 1
          - userId: 1
            productId: 2222
            quantity: 1
          - userId: 1
            productId: 333
            quantity: -1

    - verify:
        collection: examples/shopping/carts
        documents:
          - userId: 1
            items:
              - product:
                  id: 333
                  name: Fruit Rollup
                  price: 0.79
                action: { quantity: 0 }
              - product:
                  id: 2222
                  name: Fruit By The Foot
                  price: 0.89
                action: { quantity: 3 }
              - product:
                  id: 4004
                  name: Gushers
                  price: 2.95
                action: { quantity: 1 }