Coverage for ids_iforest/capture.py: 0%

68 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 16:19 +0000

1"""Capture live traffic and export aggregated flows to a CSV file for training. 

2 

3This module provides a simple command‑line tool to sniff packets on a 

4network interface, aggregate them into bidirectional flows and write 

5the resulting feature vectors to a CSV file. It is intended for 

6gathering benign or malicious training data for the Isolation Forest 

7model. 

8 

9Each row in the output CSV corresponds to a flow and contains the 

10columns defined by ``flows_to_dataframe`` plus an optional ``label`` 

11column. When ``--label`` is provided, all rows in the dataset will 

12have that value; otherwise the label column is omitted. 

13 

14Usage example:: 

15 

16 ids-iforest-capture --minutes 5 --out flows_benign.csv --label 0 

17 

18""" 

19 

20from __future__ import annotations 

21 

22import argparse 

23import time 

24from typing import Optional, Dict, Tuple, Any 

25 

26try: 

27 import pyshark # type: ignore 

28except Exception: # pragma: no cover 

29 pyshark = None # type: ignore 

30 

31import pandas as pd # type: ignore 

32 

33from .utils import ( 

34 load_config, 

35 get_logger, 

36 aggregate_packets_to_flows, 

37 flows_to_dataframe, 

38) 

39 

40__all__ = ["main"] 

41 

42 

43def capture_flows( 

44 cfg: Dict[str, Any], 

45 duration_minutes: float, 

46 logger: Any, 

47) -> pd.DataFrame: 

48 """Capture live packets and return aggregated flows as a DataFrame. 

49 

50 Packets are sniffed on the configured interface using PyShark until 

51 ``duration_minutes`` elapses. Flows are aggregated using the 

52 configured window length and feature set. At the end of capture 

53 all flows are converted to a DataFrame and returned. 

54 """ 

55 if pyshark is None: 

56 raise RuntimeError("pyshark is not installed – cannot capture packets") 

57 interface = cfg.get("iface", "eth0") 

58 bpf_filter = cfg.get("bpf_filter", "tcp or udp") 

59 window = cfg["window_seconds"] 

60 feature_set = cfg.get("feature_set", "extended") 

61 cap = pyshark.LiveCapture(interface=interface, bpf_filter=bpf_filter) 

62 logger.info( 

63 f"Beginning capture on {interface} for {duration_minutes} minute(s) with window {window}s" 

64 ) 

65 flows: Dict[Tuple[int, Tuple[Any, Any, str]], Dict[str, Any]] = {} 

66 base_ts: Optional[float] = None 

67 end_time = time.time() + duration_minutes * 60 

68 try: 

69 for pkt in cap.sniff_continuously(): 

70 now = time.time() 

71 if now >= end_time: 

72 break 

73 try: 

74 ts = float(pkt.frame_info.time_epoch) 

75 except Exception: 

76 continue 

77 if base_ts is None: 

78 base_ts = ts 

79 # Aggregate this single packet 

80 f = aggregate_packets_to_flows( 

81 [pkt], window_seconds=window, base_ts=base_ts 

82 ) 

83 for k, st in f.items(): 

84 if k in flows: 

85 existing = flows[k] 

86 existing["packets"] += st["packets"] 

87 existing["bytes"] += st["bytes"] 

88 existing["sizes"].extend(st["sizes"]) 

89 existing["tcp_syn"] += st["tcp_syn"] 

90 existing["tcp_fin"] += st["tcp_fin"] 

91 existing["tcp_rst"] += st["tcp_rst"] 

92 existing["iat"].extend(st["iat"]) 

93 existing["first_ts"] = min(existing["first_ts"], st["first_ts"]) 

94 existing["last_ts"] = max(existing["last_ts"], st["last_ts"]) 

95 else: 

96 flows[k] = st 

97 finally: 

98 cap.close() 

99 df = flows_to_dataframe(flows, feature_set) 

100 return df 

101 

102 

103def write_dataset(df: pd.DataFrame, out_csv: str, label: Optional[int] = None) -> None: 

104 """Write the aggregated flows DataFrame to a CSV file. 

105 

106 When ``label`` is provided, a ``label`` column is added with that 

107 constant value. The CSV header is always written. 

108 """ 

109 if label is not None: 

110 df = df.copy() 

111 df["label"] = int(label) 

112 df.to_csv(out_csv, index=False) 

113 

114 

115def main() -> None: 

116 """Entry point for ids-iforest-capture console script.""" 

117 ap = argparse.ArgumentParser( 

118 description="Capture live packets and aggregate flows for training" 

119 ) 

120 ap.add_argument( 

121 "--config", default="config/config.yml", help="Path to configuration YAML file" 

122 ) 

123 ap.add_argument( 

124 "--minutes", type=float, default=1.0, help="Duration to capture in minutes" 

125 ) 

126 ap.add_argument( 

127 "--out", required=True, help="Output CSV path for the aggregated flows" 

128 ) 

129 ap.add_argument( 

130 "--label", 

131 type=int, 

132 default=None, 

133 help="Optional label (0=benign, 1=malicious) to assign to all flows", 

134 ) 

135 args = ap.parse_args() 

136 cfg = load_config(args.config) 

137 logger = get_logger("capture", cfg["logs_dir"], "capture.log") 

138 df = capture_flows(cfg, args.minutes, logger) 

139 if df.empty: 

140 logger.info("No flows captured; dataset will be empty") 

141 write_dataset(df, args.out, args.label) 

142 logger.info(f"Captured {len(df)} flows written to {args.out}") 

143 

144 

145if __name__ == "__main__": # pragma: no cover 

146 main()