diff --git a/README.md b/README.md index 71abbd5..ee3ca8f 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ Ensure the following dependencies are installed: Run the ARP Scanner using the following command. You need to provide the network interface (like `eth0`, `wlan0`, or `wlp0s20f3`) for your system: ```bash - sudo `which python3` main.py --interface + sudo `which python3` main.py --interface --timeout 500 ``` On Ubuntu in case you run into this error: diff --git a/core/arp_scanner.py b/core/arp_scanner.py index e476548..84ec675 100644 --- a/core/arp_scanner.py +++ b/core/arp_scanner.py @@ -3,14 +3,15 @@ """ import ipaddress import netifaces -from scapy.all import arping, ARP, get_if_addr # pylint: disable=E0611 +from scapy.all import ARP, get_if_addr, srp, Ether# pylint: disable=E0611 from PySide6.QtWidgets import ( # pylint: disable=E0611 QMainWindow, QVBoxLayout, QLabel, QWidget, QDialog, - QListWidgetItem + QListWidgetItem, + QProgressBar ) from PySide6.QtGui import QIcon, QFont, QColor # pylint: disable=E0611 from PySide6.QtCore import Slot, Qt, QTimer # pylint: disable=E0611 @@ -54,10 +55,11 @@ def __init__(self, ip_address, mac_address, hostname, device_vendor): class DeviceDiscoveryDialog(QDialog): # pylint: disable=too-many-instance-attributes """Device Discovery""" - def __init__(self, interface, oui_url, parent=None): + def __init__(self, interface, oui_url, timeout=1000, parent=None): super().__init__(parent) self.interface = interface self.mac_vendor_lookup = vendor.MacVendorLookup(oui_url) + self.timeout = timeout self._ui = Ui_DeviceDiscovery() self._ui.setupUi(self) @@ -65,6 +67,13 @@ def __init__(self, interface, oui_url, parent=None): # Initialize the UI and connection setup self.setup_ui_elements() + # Add a progress bar to the UI + self.progress_bar = QProgressBar(self) + self.progress_bar.setRange(0, 100) + self.progress_bar.setValue(0) + # Add it to the vertical layout (or any layout of your choice) + self._ui.verticalLayout.addWidget(self.progress_bar) + # Initialize scanner and device info storage self.scanner_timer = None self.device_info = {} # Store dynamic device info here @@ -118,6 +127,11 @@ def add_static_ui_labels(self): self._ui.verticalLayout.addWidget(default_gateway_label) self._ui.verticalLayout.addWidget(local_mac_label) + # Add timeout information + timeout_label = QLabel(f"Scan Timeout: {self.timeout}ms") + timeout_label.setStyleSheet("color: black") + self._ui.verticalLayout.addWidget(timeout_label) + def setup_font_for_list_widgets(self): """Sets up a uniform font for list widgets.""" font = QFont() @@ -125,6 +139,11 @@ def setup_font_for_list_widgets(self): self._ui.devices.setFont(font) self._ui.responses.setFont(font) + @Slot(int) + def update_progress(self, progress): + """Update progress""" + self.progress_bar.setValue(progress) + @Slot(QListWidgetItem) def open_device_details(self, item): """Click on a device opens another window with details.""" @@ -179,15 +198,21 @@ def start_scan(self): return # Create and start a new ARP scan thread - self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup) + self.arp_scanner_thread = ARPScannerThread( + self.interface, + self.mac_vendor_lookup, + self.timeout/1000 + ) self.arp_scanner_thread.partialResults.connect(self.handle_partial_results) self.arp_scanner_thread.finished.connect(self.handle_scan_results) + self.arp_scanner_thread.progressChanged.connect(self.update_progress) # New connection self.arp_scanner_thread.start() - print("Started ARP scan.") - + print(f"Started ARP scan with timeout: {self.timeout}ms") + @Slot(list) def handle_partial_results(self, partial_results): - for ip_address, mac, hostname, device_vendor, packet in partial_results: + """Update partials""" + for ip_address, mac, hostname, device_vendor, packet in partial_results: # pylint: disable=unused-variable self.add_device_to_list(ip_address, mac, hostname, device_vendor) @Slot(list) @@ -229,9 +254,11 @@ def quit_application(self): self.arp_scanner_thread.wait() self.close() -class ARPScannerThread(QThread): +class ARPScannerThread(QThread): # pylint: disable=too-few-public-methods + """ARP scanner""" finished = Signal(list) # Final results partialResults = Signal(list) # Intermediate results + progressChanged = Signal(int) # New signal for progress (0-100%) def __init__(self, interface, mac_vendor_lookup, timeout=1): super().__init__() @@ -247,10 +274,10 @@ def _scan_ip_native(self, src_ip, target_ip): self.interface, str(src_ip), str(target_ip), - int(self.timeout * 300) # 300ms timeout per scan + int(self.timeout * 1000) # Convert to ms ) return target_ip, result - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught print(f"Error scanning {target_ip}: {e}") return target_ip, None @@ -262,6 +289,7 @@ def _create_arp_response(self, ip_addr, mac): })() def run(self): + """Run the ARP scan thread.""" src_ip = get_if_addr(self.interface) try: netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['netmask'] @@ -270,38 +298,61 @@ def run(self): self.finished.emit([]) return - arp_results = [] + network = ipaddress.IPv4Network(network_cidr) + arp_results = self._scan_network(src_ip, network) + self.finished.emit(arp_results) + + def _scan_network(self, src_ip, network): + """Scan the given network and return ARP results.""" + hosts = [str(ip) for ip in network.hosts() if str(ip) != src_ip] if self.use_native: print("Using native ARP scanner") - network = ipaddress.IPv4Network(network_cidr) - count = 0 - for ip in network.hosts(): - if str(ip) == src_ip: - continue # Skip scanning our own IP - target_ip, result = self._scan_ip_native(src_ip, str(ip)) - if result: - device_vendor = self.mac_vendor_lookup.lookup_vendor(result['mac']) - hostname = net.get_hostname(target_ip) - arp_response = self._create_arp_response(target_ip, result['mac']) - arp_results.append((target_ip, result['mac'], hostname, device_vendor, arp_response)) - count += 1 - # Every 10 IPs (or any chosen interval), emit partial results - if count % 10 == 0: - self.partialResults.emit(arp_results) - self.finished.emit(arp_results) - else: - print("Using Scapy ARP scanner") + return self._run_native_scan(src_ip, hosts) + print("Using Scapy ARP scanner with progress updates") + return self._run_scapy_scan(hosts) + + def _run_native_scan(self, src_ip, hosts): + """Perform native ARP scanning on a list of hosts.""" + arp_results = [] + total = len(hosts) + for count, ip in enumerate(hosts, start=1): + target_ip, result = self._scan_ip_native(src_ip, ip) + if result: + device_vendor = self.mac_vendor_lookup.lookup_vendor(result['mac']) + hostname = net.get_hostname(target_ip) + arp_response = self._create_arp_response(target_ip, result['mac']) + arp_results.append(target_ip, + result['mac'], + hostname, + device_vendor, + arp_response + ) + self._update_progress(count, total, arp_results) + return arp_results + + def _run_scapy_scan(self, hosts): + """Perform Scapy ARP scanning on a list of hosts.""" + arp_results = [] + total = len(hosts) + for count, ip in enumerate(hosts, start=1): try: - arp_packets = arping(network_cidr, timeout=self.timeout, verbose=1)[0] - except Exception as e: - print(f"Error during ARP scan: {e}") - self.finished.emit([]) - return - - for packet in arp_packets: - ip_addr = packet[1][ARP].psrc - mac = packet[1][ARP].hwsrc - device_vendor = self.mac_vendor_lookup.lookup_vendor(mac) - hostname = net.get_hostname(ip_addr) - arp_results.append((ip_addr, mac, hostname, device_vendor, packet[1][ARP])) - self.finished.emit(arp_results) + ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip), + timeout=self.timeout, verbose=0) + if ans: + for _, received in ans: + ip_addr = received.psrc + mac = received.hwsrc + device_vendor = self.mac_vendor_lookup.lookup_vendor(mac) + hostname = net.get_hostname(ip_addr) + arp_results.append((ip_addr, mac, hostname, device_vendor, received)) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error scanning {ip}: {e}") + self._update_progress(count, total, arp_results) + return arp_results + + def _update_progress(self, count, total, arp_results): + """Update progress and emit partial results every 10 hosts.""" + progress = int((count / total) * 100) + self.progressChanged.emit(progress) + if count % 10 == 0: + self.partialResults.emit(arp_results) diff --git a/images/phantom.png b/images/phantom.png index 1327f07..b07b538 100644 Binary files a/images/phantom.png and b/images/phantom.png differ diff --git a/main.py b/main.py index e1b5b3a..fd9dad4 100644 --- a/main.py +++ b/main.py @@ -14,13 +14,15 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description='ARP Sniffer') parser.add_argument('--interface', required=True, help='Network interface name') + parser.add_argument('--timeout', type=int, default=1000, help='Timeout in milliseconds for ARP scan (default: 1000)') args = parser.parse_args() app = QApplication(sys.argv) window = DeviceDiscoveryDialog( args.interface, - oui_url="http://standards-oui.ieee.org/oui/oui.csv" + oui_url="http://standards-oui.ieee.org/oui/oui.csv", + timeout=args.timeout ) window.show()