Coverage for ids_iforest/scripts/generate_datasets.py: 0%
66 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 16:19 +0000
1"""Generate synthetic training datasets for the ids_iforest project.
3This script creates CSV files containing artificial flow records for
4benign and malicious traffic. The synthetic data is meant to
5exercise the Isolation Forest model during development. It is not a
6replacement for real labelled datasets like CIC‑IDS 2017, but it
7provides a quick way to test training and detection logic on a modest
8machine.
10The generated flows include two categories of anomalies:
12* SYN flood: very large numbers of packets with many SYN flags and
13 short durations.
14* Port scan: high connection rates to many destination ports with
15 extremely low packet counts per flow.
17Each flow has a ``label`` column set to 0 for benign or 1 for
18malicious. Numeric features follow the definitions used by
19``flows_to_dataframe``.
20"""
22from __future__ import annotations
24import argparse
25import ipaddress
26import random
27from typing import List
29import pandas as pd # type: ignore
31__all__ = ["main"]
34def _random_ip(private: bool = True) -> str:
35 """Return a random IPv4 address.
37 When ``private`` is true, pick from RFC1918 ranges; otherwise pick
38 from the public space.
39 """
40 if private:
41 # Pick a private /16 randomly
42 block = random.choice([(10, 8), (172, 12), (192, 16)])
43 if block[0] == 10:
44 octet1 = 10
45 octet2 = random.randint(0, 255)
46 elif block[0] == 172:
47 octet1 = 172
48 octet2 = random.randint(16, 31)
49 else:
50 octet1 = 192
51 octet2 = 168
52 octet3 = random.randint(0, 255)
53 octet4 = random.randint(1, 254)
54 return f"{octet1}.{octet2}.{octet3}.{octet4}"
55 else:
56 return str(ipaddress.IPv4Address(random.getrandbits(32)))
59def generate_benign(n: int) -> pd.DataFrame:
60 """Generate ``n`` benign flows with realistic random values."""
61 rows: List[dict] = []
62 for _ in range(n):
63 pkt_count = random.randint(1, 50)
64 total_bytes = pkt_count * random.randint(40, 1500)
65 mean_ps = total_bytes / pkt_count
66 std_ps = mean_ps * random.uniform(0.1, 0.5)
67 duration = random.uniform(0.001, 5.0)
68 row = {
69 "window": 0,
70 "src_ip": _random_ip(True),
71 "dst_ip": _random_ip(False),
72 "src_port": random.randint(1025, 65535),
73 "dst_port": random.choice([80, 443, 22, 25, 53, 123])
74 if random.random() < 0.8
75 else random.randint(1, 65535),
76 "protocol": random.choice(["tcp", "udp"]),
77 "bidirectional_packets": pkt_count,
78 "bidirectional_bytes": total_bytes,
79 "mean_packet_size": mean_ps,
80 "std_packet_size": std_ps,
81 "flow_duration": duration,
82 "tcp_syn_count": 0,
83 "tcp_fin_count": 0,
84 "tcp_rst_count": 0,
85 "iat_mean": duration / pkt_count,
86 "iat_std": (duration / pkt_count) * random.uniform(0.1, 0.5),
87 "bytes_per_packet": total_bytes / pkt_count,
88 "packets_per_second": pkt_count / duration,
89 "label": 0,
90 }
91 rows.append(row)
92 return pd.DataFrame(rows)
95def generate_syn_flood(n: int) -> pd.DataFrame:
96 """Generate ``n`` SYN‑flood attack flows."""
97 rows: List[dict] = []
98 for _ in range(n):
99 pkt_count = random.randint(1000, 5000)
100 total_bytes = pkt_count * random.randint(40, 80)
101 duration = random.uniform(0.01, 1.0)
102 row = {
103 "window": 0,
104 "src_ip": _random_ip(False),
105 "dst_ip": _random_ip(True),
106 "src_port": random.randint(1025, 65535),
107 "dst_port": random.randint(1, 1024),
108 "protocol": "tcp",
109 "bidirectional_packets": pkt_count,
110 "bidirectional_bytes": total_bytes,
111 "mean_packet_size": total_bytes / pkt_count,
112 "std_packet_size": random.uniform(0, 5),
113 "flow_duration": duration,
114 "tcp_syn_count": pkt_count,
115 "tcp_fin_count": 0,
116 "tcp_rst_count": 0,
117 "iat_mean": duration / pkt_count,
118 "iat_std": 0.0,
119 "bytes_per_packet": total_bytes / pkt_count,
120 "packets_per_second": pkt_count / duration,
121 "label": 1,
122 }
123 rows.append(row)
124 return pd.DataFrame(rows)
127def generate_port_scan(n: int) -> pd.DataFrame:
128 """Generate ``n`` port scan attack flows."""
129 rows: List[dict] = []
130 for _ in range(n):
131 pkt_count = random.randint(1, 3)
132 total_bytes = pkt_count * random.randint(40, 200)
133 duration = random.uniform(0.0001, 0.01)
134 row = {
135 "window": 0,
136 "src_ip": _random_ip(False),
137 "dst_ip": _random_ip(True),
138 "src_port": random.randint(1025, 65535),
139 "dst_port": random.randint(1, 65535),
140 "protocol": "tcp",
141 "bidirectional_packets": pkt_count,
142 "bidirectional_bytes": total_bytes,
143 "mean_packet_size": total_bytes / pkt_count,
144 "std_packet_size": 0.0,
145 "flow_duration": duration,
146 "tcp_syn_count": pkt_count,
147 "tcp_fin_count": 0,
148 "tcp_rst_count": 0,
149 "iat_mean": duration / pkt_count,
150 "iat_std": 0.0,
151 "bytes_per_packet": total_bytes / pkt_count,
152 "packets_per_second": pkt_count / duration,
153 "label": 1,
154 }
155 rows.append(row)
156 return pd.DataFrame(rows)
159def generate_dataset(
160 n_benign: int,
161 n_syn_flood: int,
162 n_port_scan: int,
163) -> pd.DataFrame:
164 """Combine benign and attack flows into a single shuffled DataFrame."""
165 dfs = [
166 generate_benign(n_benign),
167 generate_syn_flood(n_syn_flood),
168 generate_port_scan(n_port_scan),
169 ]
170 df = pd.concat(dfs, ignore_index=True)
171 df = df.sample(frac=1.0, random_state=42).reset_index(drop=True)
172 return df
175def main() -> None:
176 """Entry point for ids-iforest-generate console script."""
177 ap = argparse.ArgumentParser(
178 description="Generate synthetic training datasets for ids_iforest"
179 )
180 ap.add_argument("--benign", type=int, default=1000, help="Number of benign flows")
181 ap.add_argument(
182 "--syn-flood", type=int, default=100, help="Number of SYN flood attack flows"
183 )
184 ap.add_argument(
185 "--port-scan", type=int, default=100, help="Number of port scan attack flows"
186 )
187 ap.add_argument("--out", required=True, help="Output CSV file path")
188 args = ap.parse_args()
189 df = generate_dataset(args.benign, args.syn_flood, args.port_scan)
190 df.to_csv(args.out, index=False)
191 print(f"Generated dataset with {len(df)} flows → {args.out}")
194if __name__ == "__main__": # pragma: no cover
195 main()