Products CSV Ingestion

We’ll walk through how to populate our “products” Collection from a CSV file. Here’s the schema for a product:

product.schema.yaml
description: "A product that is available for purchase"
type: object
properties:
  id: { type: integer }
  name: { type: string }
  price: { type: number }
required: [id, name, price]

We want to ingest a CSV with all our products from the old system. This works by sending the CSV over a websocket to flow-ingester, which will convert each CSV row to a JSON document and add it to our products Collection. Here’s a sample of our CSV data:

products.csv
product_num,price,name
1,0.79,Fruit Rollup
2,0.89,Fruit by the Foot

The price and name columns match the properties in our schema exactly, so it’s obvious how those will end up in the final JSON document. But we’ll need to tell Flow that the product_num column should be mapped to the id field. We do this by adding a projection to our products Collection.

  - name: examples/shopping/products
    schema: product.schema.yaml
    key: [/id]
    projections:
      product_num: /id

With this projection, we’ll be able to simply send the CSV to flow-ingester over a websocket:

cat products.csv | websocat --protocol csv/v1 'ws://localhost:8081/ingest/examples/shopping/products'

We’ll see the usual JSON response from flow-ingester. For larger CSVs, we may see many such responses as flow-ingester will break it down into multiple smaller transactions.