Coverage for ids_iforest/scripts/prepare_csecic2018.py: 0%
197 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
1# ids_iforest/scripts/prepare_csecic2018.py
2import argparse
3import glob
4import pandas as pd
5import numpy as np
6from typing import Dict, Optional, Tuple
8# Protocol mapping
9PROTO_MAP = {6: "tcp", 17: "udp"}
11# Acceptable (case-insensitive) synonyms for each logical field
12SYNONYMS = {
13 "flow_id": ["flow id", "flowid"],
14 "src_ip": ["src ip", "source ip", "src_ip"],
15 "dst_ip": ["dst ip", "destination ip", "dst_ip"],
16 "src_port": ["src port", "source port", "src_port"],
17 "dst_port": ["dst port", "destination port", "dst_port"],
18 "protocol": ["protocol"],
19 # durations & IAT (CICFlowMeter in microseconds)
20 "flow_duration_us": ["flow duration", "flow duration(us)", "flow_duration"],
21 "flow_iat_mean_us": ["flow iat mean", "flow iat mean(us)", "flow_iat_mean"],
22 "flow_iat_std_us": ["flow iat std", "flow iat std(us)", "flow_iat_std"],
23 # packets / bytes (abrégé vs long)
24 "fwd_pkts": ["tot fwd pkts", "total fwd packets", "fwd packets", "fwd_pkts"],
25 "bwd_pkts": ["tot bwd pkts", "total backward packets", "bwd packets", "bwd_pkts"],
26 "fwd_bytes": [
27 "totlen fwd pkts",
28 "total length of fwd packets",
29 "fwd bytes",
30 "fwd_bytes",
31 ],
32 "bwd_bytes": [
33 "totlen bwd pkts",
34 "total length of bwd packets",
35 "bwd bytes",
36 "bwd_bytes",
37 ],
38 # flags
39 "syn": ["syn flag cnt", "syn flag count", "syn"],
40 "fin": ["fin flag cnt", "fin flag count", "fin"],
41 "rst": ["rst flag cnt", "rst flag count", "rst"],
42 # std packet size in CICFlowMeter (Pkt Len Std)
43 "pkt_len_std": ["pkt len std", "packet length std", "pkt length std"],
44 "label_raw": ["label"],
45}
47# Patterns sometimes used by CICFlowMeter in Flow ID fields
48FLOW_ID_PATTERNS = [
49 r"^(?P<src_ip>[^:]+):(?P<src_port>\d+)-(?P<dst_ip>[^:]+):(?P<dst_port>\d+)-(?P<proto>.+)$",
50 r"^(?P<src_ip>[^-]+)-(?P<dst_ip>[^-]+)-(?P<src_port>\d+)-(?P<dst_port>\d+)-(?P<proto>.+)$",
51]
53REQUIRED_CORE = [
54 "protocol",
55 "flow_duration_us",
56 "fwd_pkts",
57 "bwd_pkts",
58 "fwd_bytes",
59 "bwd_bytes",
60 "label_raw",
61]
64def _normcols(cols):
65 # lower/strip; replace "/" by space; collapse spaces
66 norm = {}
67 for c in cols:
68 k = str(c).strip().lower().replace("/", " ")
69 k = " ".join(k.split())
70 norm[k] = c
71 return norm
74def _pick(norm_map: Dict[str, str], names):
75 for n in names:
76 k = " ".join(n.strip().lower().split())
77 if k in norm_map:
78 return norm_map[k]
79 return None
82def _parse_flow_id(series: pd.Series):
83 src_ip = pd.Series(["0.0.0.0"] * len(series))
84 dst_ip = pd.Series(["0.0.0.0"] * len(series))
85 src_port = pd.Series([0] * len(series))
86 dst_port = pd.Series([0] * len(series))
87 proto = pd.Series([None] * len(series))
88 for pat in FLOW_ID_PATTERNS:
89 m = series.astype(str).str.extract(pat, expand=True)
90 # if all NaN, continue
91 if m.isna().all().all():
92 continue
93 src_ip = m.get("src_ip", src_ip).fillna(src_ip)
94 dst_ip = m.get("dst_ip", dst_ip).fillna(dst_ip)
95 if "src_port" in m:
96 src_port = (
97 pd.to_numeric(m["src_port"], errors="coerce")
98 .fillna(src_port)
99 .astype(int)
100 )
101 if "dst_port" in m:
102 dst_port = (
103 pd.to_numeric(m["dst_port"], errors="coerce")
104 .fillna(dst_port)
105 .astype(int)
106 )
107 proto = m.get("proto", proto).fillna(proto)
108 break
109 return src_ip, dst_ip, src_port, dst_port, proto
112def _select_columns(
113 df: pd.DataFrame,
114) -> Tuple[Dict[str, Optional[str]], Dict[str, str]]:
115 norm = _normcols(df.columns)
116 sel = {tgt: _pick(norm, srcs) for tgt, srcs in SYNONYMS.items()}
117 missing = [k for k in REQUIRED_CORE if sel.get(k) is None]
118 if missing:
119 raise KeyError(f"missing required core columns: {missing}")
120 return sel, norm
123def _process_block(
124 df: pd.DataFrame, sel: Dict[str, Optional[str]], norm: Dict[str, str]
125) -> pd.DataFrame:
126 out = pd.DataFrame(index=df.index)
128 # ---- IP/ports ----
129 # Try explicit columns; else parse Flow ID; else sentinels.
130 if sel.get("src_ip") and sel.get("dst_ip"):
131 out["src_ip"] = df[sel["src_ip"]].astype(str)
132 out["dst_ip"] = df[sel["dst_ip"]].astype(str)
133 elif sel.get("flow_id"):
134 s_ip, d_ip, s_po, d_po, _ = _parse_flow_id(df[sel["flow_id"]])
135 out["src_ip"] = s_ip
136 out["dst_ip"] = d_ip
137 out["src_port"] = s_po
138 out["dst_port"] = d_po
139 else:
140 out["src_ip"] = "0.0.0.0"
141 out["dst_ip"] = "0.0.0.0"
143 # Ports (prefer explicit columns when present)
144 if "src_port" not in out:
145 if sel.get("src_port"):
146 out["src_port"] = (
147 pd.to_numeric(df[sel["src_port"]], errors="coerce")
148 .fillna(0)
149 .astype(int)
150 )
151 else:
152 out["src_port"] = 0
153 if "dst_port" not in out:
154 # parfois "Dst Port" existe alors que "src_port" non
155 if sel.get("dst_port"):
156 out["dst_port"] = (
157 pd.to_numeric(df[sel["dst_port"]], errors="coerce")
158 .fillna(0)
159 .astype(int)
160 )
161 else:
162 out["dst_port"] = 0
164 # ---- Protocol ----
165 proto_raw = df[sel["protocol"]]
166 if np.issubdtype(proto_raw.dtype, np.number):
167 out["protocol"] = proto_raw.map(PROTO_MAP).fillna("other")
168 else:
169 out["protocol"] = (
170 proto_raw.astype(str)
171 .str.lower()
172 .str.strip()
173 .replace({"6": "tcp", "17": "udp"})
174 )
175 out.loc[~out["protocol"].isin(["tcp", "udp"]), "protocol"] = "other"
177 # ---- Durations / IAT ----
178 eps = 1e-6
179 flow_dur_us = pd.to_numeric(df[sel["flow_duration_us"]], errors="coerce")
180 out["flow_duration"] = (flow_dur_us / 1e6).fillna(0.0).clip(lower=eps)
182 if sel.get("flow_iat_mean_us"):
183 out["iat_mean"] = (
184 pd.to_numeric(df[sel["flow_iat_mean_us"]], errors="coerce") / 1e6
185 ).fillna(0.0)
186 else:
187 out["iat_mean"] = 0.0
188 if sel.get("flow_iat_std_us"):
189 out["iat_std"] = (
190 pd.to_numeric(df[sel["flow_iat_std_us"]], errors="coerce") / 1e6
191 ).fillna(0.0)
192 else:
193 out["iat_std"] = 0.0
195 # ---- pkts/bytes ----
196 fwd_pkts = pd.to_numeric(df[sel["fwd_pkts"]], errors="coerce").fillna(0)
197 bwd_pkts = pd.to_numeric(df[sel["bwd_pkts"]], errors="coerce").fillna(0)
198 fwd_bytes = pd.to_numeric(df[sel["fwd_bytes"]], errors="coerce").fillna(0)
199 bwd_bytes = pd.to_numeric(df[sel["bwd_bytes"]], errors="coerce").fillna(0)
201 out["bidirectional_packets"] = (fwd_pkts + bwd_pkts).astype("int64").clip(lower=1)
202 out["bidirectional_bytes"] = (fwd_bytes + bwd_bytes).astype("int64").clip(lower=0)
203 out["mean_packet_size"] = out["bidirectional_bytes"] / out["bidirectional_packets"]
205 # ---- std_packet_size from "Pkt Len Std" ----
206 pkt_len_std_col = sel.get("pkt_len_std")
207 if pkt_len_std_col:
208 out["std_packet_size"] = pd.to_numeric(
209 df[pkt_len_std_col], errors="coerce"
210 ).fillna(0.0)
211 else:
212 out["std_packet_size"] = 0.0 # fallback (rarement nécessaire)
214 # ---- derived features expected by trainer ----
215 out["bytes_per_packet"] = (
216 out["bidirectional_bytes"] / out["bidirectional_packets"]
217 ).astype(float)
218 out["packets_per_second"] = (
219 out["bidirectional_packets"] / out["flow_duration"]
220 ).astype(float)
222 # ---- Flags ----
223 for src_key, out_key in [
224 ("syn", "tcp_syn_count"),
225 ("fin", "tcp_fin_count"),
226 ("rst", "tcp_rst_count"),
227 ]:
228 col = sel.get(src_key)
229 if col:
230 out[out_key] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)
231 else:
232 out[out_key] = 0
234 # ---- Label ----
235 lbl = df[sel["label_raw"]].astype(str).str.strip().str.upper()
236 out["label"] = (lbl != "BENIGN").astype(int)
238 # Keep only tcp/udp
239 out = out[out["protocol"].isin(["tcp", "udp"])]
241 # Final column order (exactly what trainer expects + id fields)
242 keep = [
243 "src_ip",
244 "dst_ip",
245 "src_port",
246 "dst_port",
247 "protocol",
248 "flow_duration",
249 "bidirectional_packets",
250 "bidirectional_bytes",
251 "mean_packet_size",
252 "std_packet_size",
253 "tcp_syn_count",
254 "tcp_fin_count",
255 "tcp_rst_count",
256 "iat_mean",
257 "iat_std",
258 "bytes_per_packet",
259 "packets_per_second",
260 "label",
261 ]
262 for k in keep:
263 if k not in out:
264 # ensure column exists
265 if k.endswith("_ip"):
266 out[k] = "0.0.0.0"
267 elif k.endswith("_port"):
268 out[k] = 0
269 elif k == "protocol":
270 out[k] = "other"
271 elif k == "label":
272 out[k] = 0
273 else:
274 out[k] = 0.0
275 return out[keep]
278def _usable_file_header(
279 path: str,
280) -> Optional[Tuple[Dict[str, Optional[str]], Dict[str, str]]]:
281 # Read just header (nrows=1) to decide if file is usable and to build the selector
282 try:
283 df_head = pd.read_csv(path, nrows=1, low_memory=False)
284 sel, norm = _select_columns(df_head)
285 return sel, norm
286 except Exception as e:
287 print("SKIP (header)", path, repr(e))
288 return None
291def process_file(path: str, target_rows: int, seed: int = 42) -> pd.DataFrame:
292 # Stream in chunks and sample up to target_rows rows
293 head = _usable_file_header(path)
294 if head is None:
295 return pd.DataFrame()
296 sel, norm = head
298 chunksize = 200_000 # tune if needed
299 rng = np.random.default_rng(seed)
300 taken = 0
301 parts = []
302 try:
303 for chunk in pd.read_csv(path, chunksize=chunksize, low_memory=False):
304 df_proc = _process_block(chunk, sel, norm)
305 if target_rows <= 0:
306 parts.append(df_proc)
307 continue
308 if taken >= target_rows:
309 break
310 need = target_rows - taken
311 if len(df_proc) <= need:
312 parts.append(df_proc)
313 taken += len(df_proc)
314 else:
315 frac = need / len(df_proc)
316 sample = df_proc.sample(
317 frac=frac, random_state=int(rng.integers(0, 2**31 - 1))
318 )
319 parts.append(sample)
320 taken += len(sample)
321 if taken >= target_rows:
322 break
323 except Exception as e:
324 print("SKIP (read)", path, repr(e))
325 return pd.DataFrame()
327 if not parts:
328 return pd.DataFrame()
329 return pd.concat(parts, ignore_index=True)
332def main():
333 ap = argparse.ArgumentParser()
334 ap.add_argument(
335 "--in_glob",
336 required=True,
337 help='ex: "data/raw/csecic2018/*TrafficForML_CICFlowMeter*.csv"',
338 )
339 ap.add_argument("--out_csv", required=True)
340 ap.add_argument(
341 "--limit",
342 type=int,
343 default=0,
344 help="max total rows across all files (sampled per file). 0 = take all (can be huge).",
345 )
346 args = ap.parse_args()
348 files = sorted(glob.glob(args.in_glob))
349 if not files:
350 raise SystemExit(f"No files matched: {args.in_glob}")
352 # Probe which files are usable
353 usable = []
354 for f in files:
355 if _usable_file_header(f) is not None:
356 usable.append(f)
357 else:
358 print("SKIP", f, "→ missing core columns")
360 if not usable:
361 raise SystemExit("No usable files found.")
363 # Decide per-file sampling target if limit > 0
364 per_file_target = 0
365 remainder = 0
366 if args.limit > 0:
367 per_file_target = args.limit // len(usable)
368 remainder = args.limit % len(usable)
370 parts = []
371 for i, f in enumerate(usable):
372 tgt = 0
373 if args.limit > 0:
374 tgt = per_file_target + (1 if i < remainder else 0)
375 print(f"Processing {f} target_rows={tgt if tgt > 0 else 'ALL'}")
376 df_part = process_file(f, tgt)
377 if not df_part.empty:
378 parts.append(df_part)
379 print("OK", f, "rows:", len(df_part))
380 else:
381 print("SKIP", f, "→ no rows after processing")
383 big = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
384 if big.empty:
385 raise SystemExit("No data produced. Check inputs/columns.")
387 big.to_csv(args.out_csv, index=False)
388 print("Wrote", args.out_csv, "rows:", len(big))
391if __name__ == "__main__":
392 main()