Coverage for ids_iforest/scripts/prepare_csecic2018.py: 0%

197 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 16:19 +0000

1# ids_iforest/scripts/prepare_csecic2018.py 

2import argparse 

3import glob 

4import pandas as pd 

5import numpy as np 

6from typing import Dict, Optional, Tuple 

7 

8# Protocol mapping 

9PROTO_MAP = {6: "tcp", 17: "udp"} 

10 

11# Acceptable (case-insensitive) synonyms for each logical field 

12SYNONYMS = { 

13 "flow_id": ["flow id", "flowid"], 

14 "src_ip": ["src ip", "source ip", "src_ip"], 

15 "dst_ip": ["dst ip", "destination ip", "dst_ip"], 

16 "src_port": ["src port", "source port", "src_port"], 

17 "dst_port": ["dst port", "destination port", "dst_port"], 

18 "protocol": ["protocol"], 

19 # durations & IAT (CICFlowMeter in microseconds) 

20 "flow_duration_us": ["flow duration", "flow duration(us)", "flow_duration"], 

21 "flow_iat_mean_us": ["flow iat mean", "flow iat mean(us)", "flow_iat_mean"], 

22 "flow_iat_std_us": ["flow iat std", "flow iat std(us)", "flow_iat_std"], 

23 # packets / bytes (abrégé vs long) 

24 "fwd_pkts": ["tot fwd pkts", "total fwd packets", "fwd packets", "fwd_pkts"], 

25 "bwd_pkts": ["tot bwd pkts", "total backward packets", "bwd packets", "bwd_pkts"], 

26 "fwd_bytes": [ 

27 "totlen fwd pkts", 

28 "total length of fwd packets", 

29 "fwd bytes", 

30 "fwd_bytes", 

31 ], 

32 "bwd_bytes": [ 

33 "totlen bwd pkts", 

34 "total length of bwd packets", 

35 "bwd bytes", 

36 "bwd_bytes", 

37 ], 

38 # flags 

39 "syn": ["syn flag cnt", "syn flag count", "syn"], 

40 "fin": ["fin flag cnt", "fin flag count", "fin"], 

41 "rst": ["rst flag cnt", "rst flag count", "rst"], 

42 # std packet size in CICFlowMeter (Pkt Len Std) 

43 "pkt_len_std": ["pkt len std", "packet length std", "pkt length std"], 

44 "label_raw": ["label"], 

45} 

46 

47# Patterns sometimes used by CICFlowMeter in Flow ID fields 

48FLOW_ID_PATTERNS = [ 

49 r"^(?P<src_ip>[^:]+):(?P<src_port>\d+)-(?P<dst_ip>[^:]+):(?P<dst_port>\d+)-(?P<proto>.+)$", 

50 r"^(?P<src_ip>[^-]+)-(?P<dst_ip>[^-]+)-(?P<src_port>\d+)-(?P<dst_port>\d+)-(?P<proto>.+)$", 

51] 

52 

53REQUIRED_CORE = [ 

54 "protocol", 

55 "flow_duration_us", 

56 "fwd_pkts", 

57 "bwd_pkts", 

58 "fwd_bytes", 

59 "bwd_bytes", 

60 "label_raw", 

61] 

62 

63 

64def _normcols(cols): 

65 # lower/strip; replace "/" by space; collapse spaces 

66 norm = {} 

67 for c in cols: 

68 k = str(c).strip().lower().replace("/", " ") 

69 k = " ".join(k.split()) 

70 norm[k] = c 

71 return norm 

72 

73 

74def _pick(norm_map: Dict[str, str], names): 

75 for n in names: 

76 k = " ".join(n.strip().lower().split()) 

77 if k in norm_map: 

78 return norm_map[k] 

79 return None 

80 

81 

82def _parse_flow_id(series: pd.Series): 

83 src_ip = pd.Series(["0.0.0.0"] * len(series)) 

84 dst_ip = pd.Series(["0.0.0.0"] * len(series)) 

85 src_port = pd.Series([0] * len(series)) 

86 dst_port = pd.Series([0] * len(series)) 

87 proto = pd.Series([None] * len(series)) 

88 for pat in FLOW_ID_PATTERNS: 

89 m = series.astype(str).str.extract(pat, expand=True) 

90 # if all NaN, continue 

91 if m.isna().all().all(): 

92 continue 

93 src_ip = m.get("src_ip", src_ip).fillna(src_ip) 

94 dst_ip = m.get("dst_ip", dst_ip).fillna(dst_ip) 

95 if "src_port" in m: 

96 src_port = ( 

97 pd.to_numeric(m["src_port"], errors="coerce") 

98 .fillna(src_port) 

99 .astype(int) 

100 ) 

101 if "dst_port" in m: 

102 dst_port = ( 

103 pd.to_numeric(m["dst_port"], errors="coerce") 

104 .fillna(dst_port) 

105 .astype(int) 

106 ) 

107 proto = m.get("proto", proto).fillna(proto) 

108 break 

109 return src_ip, dst_ip, src_port, dst_port, proto 

110 

111 

112def _select_columns( 

113 df: pd.DataFrame, 

114) -> Tuple[Dict[str, Optional[str]], Dict[str, str]]: 

115 norm = _normcols(df.columns) 

116 sel = {tgt: _pick(norm, srcs) for tgt, srcs in SYNONYMS.items()} 

117 missing = [k for k in REQUIRED_CORE if sel.get(k) is None] 

118 if missing: 

119 raise KeyError(f"missing required core columns: {missing}") 

120 return sel, norm 

121 

122 

123def _process_block( 

124 df: pd.DataFrame, sel: Dict[str, Optional[str]], norm: Dict[str, str] 

125) -> pd.DataFrame: 

126 out = pd.DataFrame(index=df.index) 

127 

128 # ---- IP/ports ---- 

129 # Try explicit columns; else parse Flow ID; else sentinels. 

130 if sel.get("src_ip") and sel.get("dst_ip"): 

131 out["src_ip"] = df[sel["src_ip"]].astype(str) 

132 out["dst_ip"] = df[sel["dst_ip"]].astype(str) 

133 elif sel.get("flow_id"): 

134 s_ip, d_ip, s_po, d_po, _ = _parse_flow_id(df[sel["flow_id"]]) 

135 out["src_ip"] = s_ip 

136 out["dst_ip"] = d_ip 

137 out["src_port"] = s_po 

138 out["dst_port"] = d_po 

139 else: 

140 out["src_ip"] = "0.0.0.0" 

141 out["dst_ip"] = "0.0.0.0" 

142 

143 # Ports (prefer explicit columns when present) 

144 if "src_port" not in out: 

145 if sel.get("src_port"): 

146 out["src_port"] = ( 

147 pd.to_numeric(df[sel["src_port"]], errors="coerce") 

148 .fillna(0) 

149 .astype(int) 

150 ) 

151 else: 

152 out["src_port"] = 0 

153 if "dst_port" not in out: 

154 # parfois "Dst Port" existe alors que "src_port" non 

155 if sel.get("dst_port"): 

156 out["dst_port"] = ( 

157 pd.to_numeric(df[sel["dst_port"]], errors="coerce") 

158 .fillna(0) 

159 .astype(int) 

160 ) 

161 else: 

162 out["dst_port"] = 0 

163 

164 # ---- Protocol ---- 

165 proto_raw = df[sel["protocol"]] 

166 if np.issubdtype(proto_raw.dtype, np.number): 

167 out["protocol"] = proto_raw.map(PROTO_MAP).fillna("other") 

168 else: 

169 out["protocol"] = ( 

170 proto_raw.astype(str) 

171 .str.lower() 

172 .str.strip() 

173 .replace({"6": "tcp", "17": "udp"}) 

174 ) 

175 out.loc[~out["protocol"].isin(["tcp", "udp"]), "protocol"] = "other" 

176 

177 # ---- Durations / IAT ---- 

178 eps = 1e-6 

179 flow_dur_us = pd.to_numeric(df[sel["flow_duration_us"]], errors="coerce") 

180 out["flow_duration"] = (flow_dur_us / 1e6).fillna(0.0).clip(lower=eps) 

181 

182 if sel.get("flow_iat_mean_us"): 

183 out["iat_mean"] = ( 

184 pd.to_numeric(df[sel["flow_iat_mean_us"]], errors="coerce") / 1e6 

185 ).fillna(0.0) 

186 else: 

187 out["iat_mean"] = 0.0 

188 if sel.get("flow_iat_std_us"): 

189 out["iat_std"] = ( 

190 pd.to_numeric(df[sel["flow_iat_std_us"]], errors="coerce") / 1e6 

191 ).fillna(0.0) 

192 else: 

193 out["iat_std"] = 0.0 

194 

195 # ---- pkts/bytes ---- 

196 fwd_pkts = pd.to_numeric(df[sel["fwd_pkts"]], errors="coerce").fillna(0) 

197 bwd_pkts = pd.to_numeric(df[sel["bwd_pkts"]], errors="coerce").fillna(0) 

198 fwd_bytes = pd.to_numeric(df[sel["fwd_bytes"]], errors="coerce").fillna(0) 

199 bwd_bytes = pd.to_numeric(df[sel["bwd_bytes"]], errors="coerce").fillna(0) 

200 

201 out["bidirectional_packets"] = (fwd_pkts + bwd_pkts).astype("int64").clip(lower=1) 

202 out["bidirectional_bytes"] = (fwd_bytes + bwd_bytes).astype("int64").clip(lower=0) 

203 out["mean_packet_size"] = out["bidirectional_bytes"] / out["bidirectional_packets"] 

204 

205 # ---- std_packet_size from "Pkt Len Std" ---- 

206 pkt_len_std_col = sel.get("pkt_len_std") 

207 if pkt_len_std_col: 

208 out["std_packet_size"] = pd.to_numeric( 

209 df[pkt_len_std_col], errors="coerce" 

210 ).fillna(0.0) 

211 else: 

212 out["std_packet_size"] = 0.0 # fallback (rarement nécessaire) 

213 

214 # ---- derived features expected by trainer ---- 

215 out["bytes_per_packet"] = ( 

216 out["bidirectional_bytes"] / out["bidirectional_packets"] 

217 ).astype(float) 

218 out["packets_per_second"] = ( 

219 out["bidirectional_packets"] / out["flow_duration"] 

220 ).astype(float) 

221 

222 # ---- Flags ---- 

223 for src_key, out_key in [ 

224 ("syn", "tcp_syn_count"), 

225 ("fin", "tcp_fin_count"), 

226 ("rst", "tcp_rst_count"), 

227 ]: 

228 col = sel.get(src_key) 

229 if col: 

230 out[out_key] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int) 

231 else: 

232 out[out_key] = 0 

233 

234 # ---- Label ---- 

235 lbl = df[sel["label_raw"]].astype(str).str.strip().str.upper() 

236 out["label"] = (lbl != "BENIGN").astype(int) 

237 

238 # Keep only tcp/udp 

239 out = out[out["protocol"].isin(["tcp", "udp"])] 

240 

241 # Final column order (exactly what trainer expects + id fields) 

242 keep = [ 

243 "src_ip", 

244 "dst_ip", 

245 "src_port", 

246 "dst_port", 

247 "protocol", 

248 "flow_duration", 

249 "bidirectional_packets", 

250 "bidirectional_bytes", 

251 "mean_packet_size", 

252 "std_packet_size", 

253 "tcp_syn_count", 

254 "tcp_fin_count", 

255 "tcp_rst_count", 

256 "iat_mean", 

257 "iat_std", 

258 "bytes_per_packet", 

259 "packets_per_second", 

260 "label", 

261 ] 

262 for k in keep: 

263 if k not in out: 

264 # ensure column exists 

265 if k.endswith("_ip"): 

266 out[k] = "0.0.0.0" 

267 elif k.endswith("_port"): 

268 out[k] = 0 

269 elif k == "protocol": 

270 out[k] = "other" 

271 elif k == "label": 

272 out[k] = 0 

273 else: 

274 out[k] = 0.0 

275 return out[keep] 

276 

277 

278def _usable_file_header( 

279 path: str, 

280) -> Optional[Tuple[Dict[str, Optional[str]], Dict[str, str]]]: 

281 # Read just header (nrows=1) to decide if file is usable and to build the selector 

282 try: 

283 df_head = pd.read_csv(path, nrows=1, low_memory=False) 

284 sel, norm = _select_columns(df_head) 

285 return sel, norm 

286 except Exception as e: 

287 print("SKIP (header)", path, repr(e)) 

288 return None 

289 

290 

291def process_file(path: str, target_rows: int, seed: int = 42) -> pd.DataFrame: 

292 # Stream in chunks and sample up to target_rows rows 

293 head = _usable_file_header(path) 

294 if head is None: 

295 return pd.DataFrame() 

296 sel, norm = head 

297 

298 chunksize = 200_000 # tune if needed 

299 rng = np.random.default_rng(seed) 

300 taken = 0 

301 parts = [] 

302 try: 

303 for chunk in pd.read_csv(path, chunksize=chunksize, low_memory=False): 

304 df_proc = _process_block(chunk, sel, norm) 

305 if target_rows <= 0: 

306 parts.append(df_proc) 

307 continue 

308 if taken >= target_rows: 

309 break 

310 need = target_rows - taken 

311 if len(df_proc) <= need: 

312 parts.append(df_proc) 

313 taken += len(df_proc) 

314 else: 

315 frac = need / len(df_proc) 

316 sample = df_proc.sample( 

317 frac=frac, random_state=int(rng.integers(0, 2**31 - 1)) 

318 ) 

319 parts.append(sample) 

320 taken += len(sample) 

321 if taken >= target_rows: 

322 break 

323 except Exception as e: 

324 print("SKIP (read)", path, repr(e)) 

325 return pd.DataFrame() 

326 

327 if not parts: 

328 return pd.DataFrame() 

329 return pd.concat(parts, ignore_index=True) 

330 

331 

332def main(): 

333 ap = argparse.ArgumentParser() 

334 ap.add_argument( 

335 "--in_glob", 

336 required=True, 

337 help='ex: "data/raw/csecic2018/*TrafficForML_CICFlowMeter*.csv"', 

338 ) 

339 ap.add_argument("--out_csv", required=True) 

340 ap.add_argument( 

341 "--limit", 

342 type=int, 

343 default=0, 

344 help="max total rows across all files (sampled per file). 0 = take all (can be huge).", 

345 ) 

346 args = ap.parse_args() 

347 

348 files = sorted(glob.glob(args.in_glob)) 

349 if not files: 

350 raise SystemExit(f"No files matched: {args.in_glob}") 

351 

352 # Probe which files are usable 

353 usable = [] 

354 for f in files: 

355 if _usable_file_header(f) is not None: 

356 usable.append(f) 

357 else: 

358 print("SKIP", f, "→ missing core columns") 

359 

360 if not usable: 

361 raise SystemExit("No usable files found.") 

362 

363 # Decide per-file sampling target if limit > 0 

364 per_file_target = 0 

365 remainder = 0 

366 if args.limit > 0: 

367 per_file_target = args.limit // len(usable) 

368 remainder = args.limit % len(usable) 

369 

370 parts = [] 

371 for i, f in enumerate(usable): 

372 tgt = 0 

373 if args.limit > 0: 

374 tgt = per_file_target + (1 if i < remainder else 0) 

375 print(f"Processing {f} target_rows={tgt if tgt > 0 else 'ALL'}") 

376 df_part = process_file(f, tgt) 

377 if not df_part.empty: 

378 parts.append(df_part) 

379 print("OK", f, "rows:", len(df_part)) 

380 else: 

381 print("SKIP", f, "→ no rows after processing") 

382 

383 big = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame() 

384 if big.empty: 

385 raise SystemExit("No data produced. Check inputs/columns.") 

386 

387 big.to_csv(args.out_csv, index=False) 

388 print("Wrote", args.out_csv, "rows:", len(big)) 

389 

390 

391if __name__ == "__main__": 

392 main()