Coverage for ids_iforest/pcap2flows.py: 0%

52 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 16:19 +0000

1"""Convert a PCAP file into a flows CSV file. 

2 

3This script reads packets from a PCAP using PyShark, aggregates them 

4into bidirectional flows using the same logic as detection, and writes 

5the resulting feature vectors to a CSV. It is useful for preparing 

6training data from offline captures. 

7""" 

8 

9from __future__ import annotations 

10 

11import argparse 

12from typing import Dict, Any, Tuple, Optional 

13 

14try: 

15 import pyshark # type: ignore 

16except Exception: 

17 pyshark = None # type: ignore 

18 

19import pandas as pd # type: ignore 

20 

21from .utils import load_config, aggregate_packets_to_flows, flows_to_dataframe 

22 

23__all__ = ["main"] 

24 

25 

26def pcap_to_dataframe( 

27 pcap_path: str, 

28 cfg: Dict[str, Any], 

29) -> pd.DataFrame: 

30 """Read a PCAP and return a DataFrame of aggregated flows.""" 

31 if pyshark is None: 

32 raise RuntimeError("pyshark is not installed – cannot process PCAPs") 

33 window = cfg["window_seconds"] 

34 feature_set = cfg.get("feature_set", "extended") 

35 cap = pyshark.FileCapture( 

36 pcap_path, 

37 only_summaries=False, 

38 keep_packets=False, 

39 decode_as={"tcp.port==80": "http"}, 

40 ) 

41 flows: Dict[Tuple[int, Tuple[Any, Any, str]], Dict[str, Any]] = {} 

42 base_ts: Optional[float] = None 

43 for pkt in cap: 

44 try: 

45 ts = float(pkt.frame_info.time_epoch) 

46 except Exception: 

47 continue 

48 if base_ts is None: 

49 base_ts = ts 

50 # Aggregate this single packet 

51 f = aggregate_packets_to_flows([pkt], window_seconds=window, base_ts=base_ts) 

52 for k, st in f.items(): 

53 if k in flows: 

54 existing = flows[k] 

55 existing["packets"] += st["packets"] 

56 existing["bytes"] += st["bytes"] 

57 existing["sizes"].extend(st["sizes"]) 

58 existing["tcp_syn"] += st["tcp_syn"] 

59 existing["tcp_fin"] += st["tcp_fin"] 

60 existing["tcp_rst"] += st["tcp_rst"] 

61 existing["iat"].extend(st["iat"]) 

62 existing["first_ts"] = min(existing["first_ts"], st["first_ts"]) 

63 existing["last_ts"] = max(existing["last_ts"], st["last_ts"]) 

64 else: 

65 flows[k] = st 

66 cap.close() 

67 df = flows_to_dataframe(flows, feature_set) 

68 return df 

69 

70 

71def main() -> None: 

72 """Entry point for ids-iforest-pcap2csv console script.""" 

73 ap = argparse.ArgumentParser(description="Aggregate flows from a PCAP into a CSV") 

74 ap.add_argument( 

75 "--config", default="config/config.yml", help="Path to configuration YAML file" 

76 ) 

77 ap.add_argument("--pcap", required=True, help="PCAP file to process") 

78 ap.add_argument("--out", required=True, help="Output CSV file") 

79 args = ap.parse_args() 

80 cfg = load_config(args.config) 

81 df = pcap_to_dataframe(args.pcap, cfg) 

82 df.to_csv(args.out, index=False) 

83 print(f"Wrote {len(df)} flows to {args.out}") 

84 

85 

86if __name__ == "__main__": # pragma: no cover 

87 main()