Query Languages & Data Analysis
Network Flow Analysis with Kusto Query Language (KQL)
Problem
Network flow logs contain vast amounts of connection data including source/destination IPs, ports, byte counts, and packet information. Security analysts need to identify top talkers, suspicious connections, unusual traffic patterns, and encoded data hidden in flow metadata. Manual analysis of millions of flow records is impractical.
Solution
Use KQL to analyze network flow logs with powerful aggregation, filtering, and pattern matching capabilities. Import flow logs into Azure Data Explorer or use local Kusto tools. KQL's pipe-based syntax enables building complex analysis workflows by chaining simple operations.
Implementation
Top Talker by Source IP:
NetworkFlowTable
| summarize TotalBytes = sum(BytesSrcToDest + BytesDestToSrc) by SrcPublicIps
| top 1 by TotalBytes desc
| project SrcPublicIps
IPs Using Specific Ports:
NetworkFlowTable
| where DestPort in (80, 443)
| summarize Ports = make_set(DestPort) by SrcPublicIps
| where array_length(Ports) == 2
| project SrcPublicIps
Decode Base64 Packet Data:
NetworkFlowTable
| where SrcPublicIps == "target_ip"
| where isnotempty(packet_data)
| project DecodedData = base64_decode_tostring(packet_data)
| distinct DecodedData
Common KQL Operators:
// Filtering
| where DestPort == 80
| where SrcPublicIps in ("1.2.3.4", "5.6.7.8")
| where FlowStartTime > ago(7d)
// Aggregation
| summarize count() by SrcPublicIps
| summarize sum(TotalBytes), avg(PacketCount) by Protocol
| summarize make_list(DestIp) by SrcPublicIps
// Sorting / selection
| sort by TotalBytes desc
| top 10 by FlowCount
| distinct SrcPublicIps
// Derived columns
| extend BytesMB = BytesSrcToDest / 1048576.0
| extend DecodedPacket = base64_decode_tostring(packet_data)
Security Use Cases:
// Data exfiltration detection
NetworkFlowTable
| where BytesSrcToDest > 10000000
| where DestPort not in (80, 443)
| summarize TotalExfiltrated = sum(BytesSrcToDest) by SrcPublicIps, DestIp, DestPort
| order by TotalExfiltrated desc
// Port scanning identification
NetworkFlowTable
| summarize UniqueDestPorts = dcount(DestPort) by SrcPublicIps
| where UniqueDestPorts > 100
| order by UniqueDestPorts desc
// Beacon detection
NetworkFlowTable
| where SrcPublicIps == "suspicious_ip"
| summarize FlowTimes = make_list(FlowStartTime) by DestIp
Best Practices: Filter early in the pipeline, use specific time ranges on large datasets, use let statements for reusable components, test on small samples first.