Coverage for ids_iforest/pcap2flows.py: 0%
52 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
1"""Convert a PCAP file into a flows CSV file.
3This script reads packets from a PCAP using PyShark, aggregates them
4into bidirectional flows using the same logic as detection, and writes
5the resulting feature vectors to a CSV. It is useful for preparing
6training data from offline captures.
7"""
9from __future__ import annotations
11import argparse
12from typing import Dict, Any, Tuple, Optional
14try:
15 import pyshark # type: ignore
16except Exception:
17 pyshark = None # type: ignore
19import pandas as pd # type: ignore
21from .utils import load_config, aggregate_packets_to_flows, flows_to_dataframe
23__all__ = ["main"]
26def pcap_to_dataframe(
27 pcap_path: str,
28 cfg: Dict[str, Any],
29) -> pd.DataFrame:
30 """Read a PCAP and return a DataFrame of aggregated flows."""
31 if pyshark is None:
32 raise RuntimeError("pyshark is not installed – cannot process PCAPs")
33 window = cfg["window_seconds"]
34 feature_set = cfg.get("feature_set", "extended")
35 cap = pyshark.FileCapture(
36 pcap_path,
37 only_summaries=False,
38 keep_packets=False,
39 decode_as={"tcp.port==80": "http"},
40 )
41 flows: Dict[Tuple[int, Tuple[Any, Any, str]], Dict[str, Any]] = {}
42 base_ts: Optional[float] = None
43 for pkt in cap:
44 try:
45 ts = float(pkt.frame_info.time_epoch)
46 except Exception:
47 continue
48 if base_ts is None:
49 base_ts = ts
50 # Aggregate this single packet
51 f = aggregate_packets_to_flows([pkt], window_seconds=window, base_ts=base_ts)
52 for k, st in f.items():
53 if k in flows:
54 existing = flows[k]
55 existing["packets"] += st["packets"]
56 existing["bytes"] += st["bytes"]
57 existing["sizes"].extend(st["sizes"])
58 existing["tcp_syn"] += st["tcp_syn"]
59 existing["tcp_fin"] += st["tcp_fin"]
60 existing["tcp_rst"] += st["tcp_rst"]
61 existing["iat"].extend(st["iat"])
62 existing["first_ts"] = min(existing["first_ts"], st["first_ts"])
63 existing["last_ts"] = max(existing["last_ts"], st["last_ts"])
64 else:
65 flows[k] = st
66 cap.close()
67 df = flows_to_dataframe(flows, feature_set)
68 return df
71def main() -> None:
72 """Entry point for ids-iforest-pcap2csv console script."""
73 ap = argparse.ArgumentParser(description="Aggregate flows from a PCAP into a CSV")
74 ap.add_argument(
75 "--config", default="config/config.yml", help="Path to configuration YAML file"
76 )
77 ap.add_argument("--pcap", required=True, help="PCAP file to process")
78 ap.add_argument("--out", required=True, help="Output CSV file")
79 args = ap.parse_args()
80 cfg = load_config(args.config)
81 df = pcap_to_dataframe(args.pcap, cfg)
82 df.to_csv(args.out, index=False)
83 print(f"Wrote {len(df)} flows to {args.out}")
86if __name__ == "__main__": # pragma: no cover
87 main()