Shopping Carts¶
At this point, we have Captured collections setup for users and products, and we’re ready to start building the shopping carts. We’ll start by defining a Captured collection for cart updates, which will hold a record of each time a user adds or modifies a product in their cart. These cart updates will then be joined with the product information so that we’ll have the price of each item. Then we’ll create a “carts” collection that rolls up all the joined updates into a single document that includes all the items in a users cart, along with their prices.
Summing Item Quantities¶
Here’s the collection and schema for the cart updates:
collections:
- name: examples/shopping/cartUpdates
schema: cart-update.schema.yaml
key: [/userId, /productId]
type: object
description: Represents a request from a user to add or remove a product in their cart.
properties:
userId: { type: integer }
productId: { type: integer }
quantity:
description: The amount to adjust, which can be negative to remove items.
type: integer
reduce: { strategy: sum }
required: [userId, productId, quantity]
reduce: { strategy: merge }
Each cartUpdate document represents a request to add or subtract a quantity of a particular
product in a user’s cart. If we get multiple updates for the same user and product, the
quantities will be summed. This is because of the reduce
annotation in the above schema.
Joining Cart Updates and Products¶
We’ll define a new derived collection that performs a streaming
inner join of cartUpdate
to product
documents.
- name: examples/shopping/cartUpdatesWithProducts
key: [/action/userId, /product/id]
schema:
type: object
properties:
action: { $ref: cart-update.schema.yaml }
product: { $ref: product.schema.yaml }
required: [action, product]
reduce: { strategy: lastWriteWins }
derivation:
register:
initial: null
schema:
oneOf:
- { $ref: product.schema.yaml }
- { type: "null" }
transform:
products:
source:
name: examples/shopping/products
update:
nodeJS: |
return [source]
cartUpdates:
source:
name: examples/shopping/cartUpdates
# Setting the shuffle key to "[/productId]" means that for each cartUpdate document from
# the source, Flow will use the value of the productId field to look up its associated
# register value.
shuffle: [/productId]
publish:
nodeJS: |
// The register schema says this might be null, so we need to deal with that here.
// If we haven't seen a product with this id, then we simply don't publish. This makes
// it an inner join.
if (register) {
// The ! in register! is a typescript non-null assertion. It's required since
// the register schema says it may be null, and safe here because we checked.
return [{action: source, product: register!}];
}
return [];
There’s two main concepts being used here. The first is the register. We’ll have a unique register value for each product id, since that’s the value we’re joining on. For each product id, the register value can either be a product document, or null, and the initial
value is always null
.
Now let’s look at the two transforms, starting with products
. This
will read documents from the products collection, and update the register for each one. Note that
the default shuffle key is implicitly the key of the source collection, in this case the /id
field of a product
. The return value of [source]
is simply adding the source product to the
set of register values. We simply return the value(s) that we’d like to be saved in registers,
rather than calling some sort of “save” function. The key for each value must be included in the
document itself, and this is verified at build/compile time. We always return a single value here
because we’re doing a 1-1 join.
Whenever a document is read from the cartUpdates
collection, the cartUpdates
transform will
read the current value of the register and publish a new document that includes both the
cartUpdate
event and the product
it joined to. If the register value is not null, then it
means that the products
update lambda has observed a product with this id, and we’ll emit a new
cartUpdatesWithProducts
document. This is what makes it an inner join, since we only return a
document if the register is not null.
Rolling Up Carts¶
Now that we have the product information joined to each item, we’re ready to aggregate all of the
joined documents into a single cart for each user. This is an excellent use case for the set
reduction strategy. In this case, we’re going to apply the reduction annotations to the register
schema, and leave the collection schema as lastWriteWins
. This means that the state will
accumulate in the register (one per userId
), and the collection documents will each reflect
the last known state.
Initially, all we’ll need is a single transform:
import:
- cart-updates-with-products.flow.yaml
- cart-purchase-requests.flow.yaml
collections:
- name: examples/shopping/carts
schema: cart.schema.yaml
key: [/userId]
derivation:
register:
initial: { userId: 0, cartItems: {} }
schema:
type: object
properties:
userId: { type: integer }
cartItems:
type: object
reduce: { strategy: set, key: [/product/id] }
additionalProperties:
type: array
items: { $ref: cart.schema.yaml#/$defs/cartItem }
required: [userId, cartItems]
reduce: { strategy: merge }
transform:
cartUpdatesWithProducts:
source:
name: examples/shopping/cartUpdatesWithProducts
shuffle:
- /action/userId
update:
nodeJS: |
return [{
userId: source.action.userId,
cartItems: {
add: [source]
}
}]
publish:
nodeJS: |
return [{
userId: register.userId,
items: register.cartItems!.add!,
}]
In the update lambda, we’re adding the combined update-product document to the cartItems
. The
use of the set
reduction strategy means that the item we provided will be deeply merged with
the existing set. So if there’s already a product with the same id
in the set, then the
sum
reduction strategy will apply.
Let’s take a look at a test case that demonstrates it working end to end. Here we’re ingesting some products followed by a series of cart updates. Then we verify the final cart.
import:
- carts.flow.yaml
- cart-updates.flow.yaml
- products.flow.yaml
tests:
"shopping cart is populated from cartUpdates":
- ingest:
collection: examples/shopping/products
documents:
- { id: 333, name: "Fruit Rollup", price: 0.79 }
- { id: 2222, name: "Fruit By The Foot", price: 0.89 }
- { id: 4004, name: "Gushers", price: 2.95 }
- ingest:
collection: examples/shopping/cartUpdates
documents:
- userId: 1
productId: 2222
quantity: 2
- userId: 1
productId: 4004
quantity: 1
- userId: 1
productId: 333
quantity: 1
- userId: 1
productId: 2222
quantity: 1
- userId: 1
productId: 333
quantity: -1
- verify:
collection: examples/shopping/carts
documents:
- userId: 1
items:
- product:
id: 333
name: Fruit Rollup
price: 0.79
action: { quantity: 0 }
- product:
id: 2222
name: Fruit By The Foot
price: 0.89
action: { quantity: 3 }
- product:
id: 4004
name: Gushers
price: 2.95
action: { quantity: 1 }
Next¶
Handle purchases to learn how to reset the state of a cart.
Check out the Derivation Patterns docs for more examples of how to manage stateful transformations.