Network Traces

We’ll tackle a few network monitoring tasks, using data drawn from this Kaggle Dataset of collected network traces.

Caution

This example is a work in progress, and currently more of a sketch. Contributions welcome!

Trace Dataset

Our source dataset contains network traces of source & destination endpoints, packet flows, and bytes – much like what you’d obtain from tcpdump. It has many repetitions, including records having the same pair of endpoints and timestamp.

Source.IP,Source.Port,Destination.IP,Destination.Port,Protocol,Timestamp,Flow.Duration,Total.Fwd.Packets,Total.Backward.Packets,Total.Length.of.Fwd.Packets,Total.Length.of.Bwd.Packets
172.19.1.46,52422,10.200.7.7,3128,6,26/04/201711:11:17,45523,22,55,132,110414
10.200.7.7,3128,172.19.1.46,52422,6,26/04/201711:11:17,1,2,0,12,0
50.31.185.39,80,10.200.7.217,38848,6,26/04/201711:11:17,1,3,0,674,0
50.31.185.39,80,10.200.7.217,38848,6,26/04/201711:11:17,217,1,3,0,0
192.168.72.43,55961,10.200.7.7,3128,6,26/04/201711:11:17,78068,5,0,1076,0
10.200.7.6,3128,172.19.1.56,50004,6,26/04/201711:11:17,105069,136,0,313554,0
192.168.72.43,55963,10.200.7.7,3128,6,26/04/201711:11:17,104443,5,0,1076,0
192.168.10.47,51848,10.200.7.6,3128,6,26/04/201711:11:17,11002,3,12,232,3664

Capturing Peer-to-Peer Flows

We don’t necessarily want to model the level of granularity that’s present in the source dataset, within a collection. Cloud storage is cheap, sure, but we simply just don’t need or want multiple records per second, per address pair. That’s still data we have to examine every time we process the collection.

Instead we can key on address pairs, and lean on reduction annotations to aggregate any repeat records that may occur within a single ingestion transaction. Here’s a schema:

$defs:
  counter:
    type: number
    reduce: { strategy: sum }

  ip-port:
    type: object
    properties:
      ip: { type: string }
      port: { type: integer }
    required: [ip, port]

  stats:
    type: object
    reduce: { strategy: merge }
    properties:
      packets: { $ref: "#/$defs/counter" }
      bytes: { $ref: "#/$defs/counter" }

type: object
reduce: { strategy: merge }
properties:
  src: { $ref: "#/$defs/ip-port" }
  dst: { $ref: "#/$defs/ip-port" }
  timestamp: { type: string }
  protocol: { enum: [0, 6, 17] }
  millis: { $ref: "#/$defs/counter" }
  fwd: { $ref: "#/$defs/stats" }
  bwd: { $ref: "#/$defs/stats" }

And a captured collection into which we’ll ingest:

collections:
  - name: examples/net-trace/pairs
    key: [/src/ip, /src/port, /dst/ip, /dst/port]
    schema:
      $ref: schema.yaml
      required: [src, dst, protocol, timestamp]

    projections:
      Source.IP: /src/ip
      Source.Port: /src/port
      Destination.IP: /dst/ip
      Destination.Port: /dst/port
      Protocol:
        location: /protocol
        partition: true
      Timestamp: /timestamp
      Flow.Duration: /millis
      Total.Fwd.Packets: /fwd/packets
      Total.Backward.Packets: /bwd/packets
      Total.Length.of.Fwd.Packets: /fwd/bytes
      Total.Length.of.Bwd.Packets: /bwd/bytes

Kick off streamed capture:

# Start a local development instance, and leave it running:
$ flowctl develop

# In another terminal:
$ examples/net-trace/load-traces.sh

Service Traffic by Day

A simplistic view which identifies services (endpoints having a port under 1024), and their aggregate network traffic by day:

import:
  - pairs.flow.yaml

# Package lambdas with an NPM dependency on Moment.js.
nodeDependencies:
  moment: "^2.24"

collections:
  - name: examples/net-trace/services
    key: [/date, /service/ip, /service/port]
    schema:
      type: object
      reduce: { strategy: merge }
      properties:
        date: { type: string }
        service: { $ref: schema.yaml#/$defs/ip-port }
        stats: { $ref: schema.yaml#/$defs/stats }
      required: [date, service]

    derivation:
      transform:
        fromPairs:
          source: { name: examples/net-trace/pairs }
          publish:
            nodeJS: |
              // Use moment.js to deal with oddball timestamp format, and truncate to current date.
              let date = moment(source.timestamp, "DD/MM/YYYYhh:mm:ss").format('YYYY-MM-DD')
              let out = [];

              if (source.src.port < 1024) {
                source.src.ip = source.src.ip.split('.').slice(0, -1).join('.');
                out.push({
                    date: date,
                    service: source.src,
                    stats: source.fwd,
                });
              }
              if (source.dst.port < 1024) {
                source.dst.ip = source.dst.ip.split('.').slice(0, -1).join('.');
                out.push({
                    date: date,
                    service: source.dst,
                    stats: source.bwd,
                });
              }
              return out;

Materialize to a test database:

$ flowctl materialize --collection examples/net-trace/services --table-name services --target testDB