Coverage for ids_iforest/capture.py: 0%
68 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
1"""Capture live traffic and export aggregated flows to a CSV file for training.
3This module provides a simple command‑line tool to sniff packets on a
4network interface, aggregate them into bidirectional flows and write
5the resulting feature vectors to a CSV file. It is intended for
6gathering benign or malicious training data for the Isolation Forest
7model.
9Each row in the output CSV corresponds to a flow and contains the
10columns defined by ``flows_to_dataframe`` plus an optional ``label``
11column. When ``--label`` is provided, all rows in the dataset will
12have that value; otherwise the label column is omitted.
14Usage example::
16 ids-iforest-capture --minutes 5 --out flows_benign.csv --label 0
18"""
20from __future__ import annotations
22import argparse
23import time
24from typing import Optional, Dict, Tuple, Any
26try:
27 import pyshark # type: ignore
28except Exception: # pragma: no cover
29 pyshark = None # type: ignore
31import pandas as pd # type: ignore
33from .utils import (
34 load_config,
35 get_logger,
36 aggregate_packets_to_flows,
37 flows_to_dataframe,
38)
40__all__ = ["main"]
43def capture_flows(
44 cfg: Dict[str, Any],
45 duration_minutes: float,
46 logger: Any,
47) -> pd.DataFrame:
48 """Capture live packets and return aggregated flows as a DataFrame.
50 Packets are sniffed on the configured interface using PyShark until
51 ``duration_minutes`` elapses. Flows are aggregated using the
52 configured window length and feature set. At the end of capture
53 all flows are converted to a DataFrame and returned.
54 """
55 if pyshark is None:
56 raise RuntimeError("pyshark is not installed – cannot capture packets")
57 interface = cfg.get("iface", "eth0")
58 bpf_filter = cfg.get("bpf_filter", "tcp or udp")
59 window = cfg["window_seconds"]
60 feature_set = cfg.get("feature_set", "extended")
61 cap = pyshark.LiveCapture(interface=interface, bpf_filter=bpf_filter)
62 logger.info(
63 f"Beginning capture on {interface} for {duration_minutes} minute(s) with window {window}s"
64 )
65 flows: Dict[Tuple[int, Tuple[Any, Any, str]], Dict[str, Any]] = {}
66 base_ts: Optional[float] = None
67 end_time = time.time() + duration_minutes * 60
68 try:
69 for pkt in cap.sniff_continuously():
70 now = time.time()
71 if now >= end_time:
72 break
73 try:
74 ts = float(pkt.frame_info.time_epoch)
75 except Exception:
76 continue
77 if base_ts is None:
78 base_ts = ts
79 # Aggregate this single packet
80 f = aggregate_packets_to_flows(
81 [pkt], window_seconds=window, base_ts=base_ts
82 )
83 for k, st in f.items():
84 if k in flows:
85 existing = flows[k]
86 existing["packets"] += st["packets"]
87 existing["bytes"] += st["bytes"]
88 existing["sizes"].extend(st["sizes"])
89 existing["tcp_syn"] += st["tcp_syn"]
90 existing["tcp_fin"] += st["tcp_fin"]
91 existing["tcp_rst"] += st["tcp_rst"]
92 existing["iat"].extend(st["iat"])
93 existing["first_ts"] = min(existing["first_ts"], st["first_ts"])
94 existing["last_ts"] = max(existing["last_ts"], st["last_ts"])
95 else:
96 flows[k] = st
97 finally:
98 cap.close()
99 df = flows_to_dataframe(flows, feature_set)
100 return df
103def write_dataset(df: pd.DataFrame, out_csv: str, label: Optional[int] = None) -> None:
104 """Write the aggregated flows DataFrame to a CSV file.
106 When ``label`` is provided, a ``label`` column is added with that
107 constant value. The CSV header is always written.
108 """
109 if label is not None:
110 df = df.copy()
111 df["label"] = int(label)
112 df.to_csv(out_csv, index=False)
115def main() -> None:
116 """Entry point for ids-iforest-capture console script."""
117 ap = argparse.ArgumentParser(
118 description="Capture live packets and aggregate flows for training"
119 )
120 ap.add_argument(
121 "--config", default="config/config.yml", help="Path to configuration YAML file"
122 )
123 ap.add_argument(
124 "--minutes", type=float, default=1.0, help="Duration to capture in minutes"
125 )
126 ap.add_argument(
127 "--out", required=True, help="Output CSV path for the aggregated flows"
128 )
129 ap.add_argument(
130 "--label",
131 type=int,
132 default=None,
133 help="Optional label (0=benign, 1=malicious) to assign to all flows",
134 )
135 args = ap.parse_args()
136 cfg = load_config(args.config)
137 logger = get_logger("capture", cfg["logs_dir"], "capture.log")
138 df = capture_flows(cfg, args.minutes, logger)
139 if df.empty:
140 logger.info("No flows captured; dataset will be empty")
141 write_dataset(df, args.out, args.label)
142 logger.info(f"Captured {len(df)} flows written to {args.out}")
145if __name__ == "__main__": # pragma: no cover
146 main()