Handling Purchases

To put it all together, we’ll create a captured collection for requests to purchase a cart, and a final derived collection to hold the complete purchase details.

Here’s the schema and Captured collection:

collections:
  - name: examples/shopping/cartPurchaseRequests
    schema:
      type: object
      description: "Represents a request from a user to purchase the items in their cart."
      properties:
        userId: { type: integer }
        timestamp:
          type: string
          format: date-time
      required: [userId, timestamp]
    # The timestamp is part of the key in order to uniquely identify multiple purchase requests for
    # the same user.
    key: [/userId, /timestamp]

We’ll read these purchase events in a couple of places. First, we’ll create a purchases derivation that stores the most recent cart for each user in a register. When it reads a purchase event, it will publish the complete cart contents.

import:
  - carts.flow.yaml
  - cart-purchase-requests.flow.yaml

collections:
  - name: examples/shopping/purchases
    schema: purchase.schema.yaml
    key: [/userId, /timestamp]
    derivation:
      register:
        initial: { userId: 0, items: [] }
        schema: cart.schema.yaml
      transform:
        carts:
          source:
            name: examples/shopping/carts
          update:
            nodeJS: return [source]

        purchaseActions:
          source:
            name: examples/shopping/cartPurchaseRequests
          shuffle: [/userId]
          publish:
            nodeJS: |
              return [{
                userId: register.userId,
                timestamp: source.timestamp,
                items: register.items,
              }]

The timestamp is again part of the key in order to uniquely identify multiple purchases from the same user. If we were to materialize the purchases collection, we’d get a separate row for each purchase. We can see this work end to end in the following test case.

purchase-tests.flow.yaml
import:
  - cart-updates.flow.yaml
  - carts.flow.yaml
  - cart-purchase-requests.flow.yaml
  - purchases.flow.yaml

tests:
  "shopping cart is purchased":
      # The "&products" here is a yaml feature that lets us re-use this object in later steps with
      # "- ingest: *products".
    - ingest: &products
        collection: examples/shopping/products
        documents:
          - { id: 333, name: "Fruit Rollup", price: 0.79 }
          - { id: 2222, name: "Fruit By The Foot", price: 0.89 }
          - { id: 4004, name: "Gushers", price: 2.95 }

    - ingest: &cartItems
        collection: examples/shopping/cartUpdates
        documents:
          - userId: 1
            productId: 2222
            quantity: 2
          - userId: 1
            productId: 4004
            quantity: 1
          - userId: 1
            productId: 333
            quantity: 1
          - userId: 1
            productId: 2222
            quantity: 1
          - userId: 1
            productId: 333
            quantity: -1
    - ingest:
        collection: examples/shopping/cartPurchaseRequests
        documents:
          - userId: 1
            timestamp: '2020-01-04 15:22:01'

    - verify:
        collection: examples/shopping/purchases
        documents:
          - userId: 1
            timestamp: '2020-01-04 15:22:01'
            items:
              - { product: { id: 333 }, action: { quantity: 0 } }
              - { product: { id: 2222 }, action: { quantity: 3 } }
              - { product: { id: 4004 }, action: { quantity: 1 } }
    - verify:
        collection: examples/shopping/carts
        documents:

The last thing we’ll do is to reset the state of a user’s cart after they complete a purchase. Here we’ll leverage Flow’s capability to have multiple readers of each collection, and add a clearAfterPurchase transform to our carts collection.

Here we have both update and publish lambdas. The update lambda clears the set of items in the register by intersecting it with [], using the same set reduction strategy. The publish lambda ensures that other readers of the carts collection (and materializations) will observe the now empty cart. This behavior is required in order for the cart is cleared after purchase test case to pass:

  "cart is cleared after purchase":
    # Here *products and *cartItems refers to the same items defined above.
    - ingest: *products
    - ingest: *cartItems
    - ingest:
        collection: examples/shopping/cartPurchaseRequests
        documents:
          - userId: 1
            timestamp: '2020-01-04 15:22:01'

    # Add and purchase one more product to assert we get a separate purchase
    - ingest:
        collection: examples/shopping/cartUpdates
        documents:
          - userId: 1
            productId: 2222
            quantity: 50

    # Verify that the cart doesn't contain any items that were already purchased
    - verify:
        collection: examples/shopping/carts
        documents:
          - userId: 1
            items:
              - product:
                  id: 2222
                  name: Fruit By The Foot
                  price: 0.89
                action:
                  productId: 2222
                  quantity: 50

    - ingest:
        collection: examples/shopping/cartPurchaseRequests
        documents:
          - userId: 1
            timestamp: '2020-01-04 15:30:44'

    # Verify that we have two distinct purchases
    - verify:
        collection: examples/shopping/purchases
        documents:
          - userId: 1
            timestamp: '2020-01-04 15:22:01'
            items:
              - { product: { id: 333 }, action: { quantity: 0 } }
              - { product: { id: 2222 }, action: { quantity: 3 } }
              - { product: { id: 4004 }, action: { quantity: 1 } }
          - userId: 1
            timestamp: '2020-01-04 15:30:44'
            items:
              - { product: { id: 2222 }, action: { quantity: 50 } }

You Made It!

If you’ve made it this far, then you’ve seen all the major elements of the Flow programming model. Some recommended next steps are: