Network Traces¶
We’ll tackle a few network monitoring tasks, using data drawn from this Kaggle Dataset of collected network traces.
Caution
This example is a work in progress, and currently more of a sketch. Contributions welcome!
Trace Dataset¶
Our source dataset contains network traces of source & destination endpoints, packet flows, and bytes – much like what you’d obtain from tcpdump. It has many repetitions, including records having the same pair of endpoints and timestamp.
Source.IP,Source.Port,Destination.IP,Destination.Port,Protocol,Timestamp,Flow.Duration,Total.Fwd.Packets,Total.Backward.Packets,Total.Length.of.Fwd.Packets,Total.Length.of.Bwd.Packets
172.19.1.46,52422,10.200.7.7,3128,6,26/04/201711:11:17,45523,22,55,132,110414
10.200.7.7,3128,172.19.1.46,52422,6,26/04/201711:11:17,1,2,0,12,0
50.31.185.39,80,10.200.7.217,38848,6,26/04/201711:11:17,1,3,0,674,0
50.31.185.39,80,10.200.7.217,38848,6,26/04/201711:11:17,217,1,3,0,0
192.168.72.43,55961,10.200.7.7,3128,6,26/04/201711:11:17,78068,5,0,1076,0
10.200.7.6,3128,172.19.1.56,50004,6,26/04/201711:11:17,105069,136,0,313554,0
192.168.72.43,55963,10.200.7.7,3128,6,26/04/201711:11:17,104443,5,0,1076,0
192.168.10.47,51848,10.200.7.6,3128,6,26/04/201711:11:17,11002,3,12,232,3664
Capturing Peer-to-Peer Flows¶
We don’t necessarily want to model the level of granularity that’s present in the source dataset, within a collection. Cloud storage is cheap, sure, but we simply just don’t need or want multiple records per second, per address pair. That’s still data we have to examine every time we process the collection.
Instead we can key on address pairs, and lean on reduction annotations to aggregate any repeat records that may occur within a single ingestion transaction. Here’s a schema:
$defs:
counter:
type: number
reduce: { strategy: sum }
ip-port:
type: object
properties:
ip: { type: string }
port: { type: integer }
required: [ip, port]
stats:
type: object
reduce: { strategy: merge }
properties:
packets: { $ref: "#/$defs/counter" }
bytes: { $ref: "#/$defs/counter" }
type: object
reduce: { strategy: merge }
properties:
src: { $ref: "#/$defs/ip-port" }
dst: { $ref: "#/$defs/ip-port" }
timestamp: { type: string }
protocol: { enum: [0, 6, 17] }
millis: { $ref: "#/$defs/counter" }
fwd: { $ref: "#/$defs/stats" }
bwd: { $ref: "#/$defs/stats" }
And a captured collection into which we’ll ingest:
collections:
- name: examples/net-trace/pairs
key: [/src/ip, /src/port, /dst/ip, /dst/port]
schema:
$ref: schema.yaml
required: [src, dst, protocol, timestamp]
projections:
Source.IP: /src/ip
Source.Port: /src/port
Destination.IP: /dst/ip
Destination.Port: /dst/port
Protocol:
location: /protocol
partition: true
Timestamp: /timestamp
Flow.Duration: /millis
Total.Fwd.Packets: /fwd/packets
Total.Backward.Packets: /bwd/packets
Total.Length.of.Fwd.Packets: /fwd/bytes
Total.Length.of.Bwd.Packets: /bwd/bytes
Kick off streamed capture:
# Start a local development instance, and leave it running:
$ flowctl develop
# In another terminal:
$ examples/net-trace/load-traces.sh
Service Traffic by Day¶
A simplistic view which identifies services (endpoints having a port under 1024), and their aggregate network traffic by day:
import:
- pairs.flow.yaml
# Package lambdas with an NPM dependency on Moment.js.
nodeDependencies:
moment: "^2.24"
collections:
- name: examples/net-trace/services
key: [/date, /service/ip, /service/port]
schema:
type: object
reduce: { strategy: merge }
properties:
date: { type: string }
service: { $ref: schema.yaml#/$defs/ip-port }
stats: { $ref: schema.yaml#/$defs/stats }
required: [date, service]
derivation:
transform:
fromPairs:
source: { name: examples/net-trace/pairs }
publish:
nodeJS: |
// Use moment.js to deal with oddball timestamp format, and truncate to current date.
let date = moment(source.timestamp, "DD/MM/YYYYhh:mm:ss").format('YYYY-MM-DD')
let out = [];
if (source.src.port < 1024) {
source.src.ip = source.src.ip.split('.').slice(0, -1).join('.');
out.push({
date: date,
service: source.src,
stats: source.fwd,
});
}
if (source.dst.port < 1024) {
source.dst.ip = source.dst.ip.split('.').slice(0, -1).join('.');
out.push({
date: date,
service: source.dst,
stats: source.bwd,
});
}
return out;
Materialize to a test database:
$ flowctl materialize --collection examples/net-trace/services --table-name services --target testDB