Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Ensure the following dependencies are installed:
Run the ARP Scanner using the following command. You need to provide the network interface (like `eth0`, `wlan0`, or `wlp0s20f3`) for your system:

```bash
sudo `which python3` main.py --interface <interface>
sudo `which python3` main.py --interface <interface> --timeout 500
```

On Ubuntu in case you run into this error:
Expand Down
135 changes: 93 additions & 42 deletions core/arp_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
"""
import ipaddress
import netifaces
from scapy.all import arping, ARP, get_if_addr # pylint: disable=E0611
from scapy.all import ARP, get_if_addr, srp, Ether# pylint: disable=E0611
from PySide6.QtWidgets import ( # pylint: disable=E0611
QMainWindow,
QVBoxLayout,
QLabel,
QWidget,
QDialog,
QListWidgetItem
QListWidgetItem,
QProgressBar
)
from PySide6.QtGui import QIcon, QFont, QColor # pylint: disable=E0611
from PySide6.QtCore import Slot, Qt, QTimer # pylint: disable=E0611
Expand Down Expand Up @@ -54,17 +55,25 @@ def __init__(self, ip_address, mac_address, hostname, device_vendor):

class DeviceDiscoveryDialog(QDialog): # pylint: disable=too-many-instance-attributes
"""Device Discovery"""
def __init__(self, interface, oui_url, parent=None):
def __init__(self, interface, oui_url, timeout=1000, parent=None):
super().__init__(parent)
self.interface = interface
self.mac_vendor_lookup = vendor.MacVendorLookup(oui_url)
self.timeout = timeout

self._ui = Ui_DeviceDiscovery()
self._ui.setupUi(self)

# Initialize the UI and connection setup
self.setup_ui_elements()

# Add a progress bar to the UI
self.progress_bar = QProgressBar(self)
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
# Add it to the vertical layout (or any layout of your choice)
self._ui.verticalLayout.addWidget(self.progress_bar)

# Initialize scanner and device info storage
self.scanner_timer = None
self.device_info = {} # Store dynamic device info here
Expand Down Expand Up @@ -118,13 +127,23 @@ def add_static_ui_labels(self):
self._ui.verticalLayout.addWidget(default_gateway_label)
self._ui.verticalLayout.addWidget(local_mac_label)

# Add timeout information
timeout_label = QLabel(f"Scan Timeout: {self.timeout}ms")
timeout_label.setStyleSheet("color: black")
self._ui.verticalLayout.addWidget(timeout_label)

def setup_font_for_list_widgets(self):
"""Sets up a uniform font for list widgets."""
font = QFont()
font.setPointSize(12)
self._ui.devices.setFont(font)
self._ui.responses.setFont(font)

@Slot(int)
def update_progress(self, progress):
"""Update progress"""
self.progress_bar.setValue(progress)

@Slot(QListWidgetItem)
def open_device_details(self, item):
"""Click on a device opens another window with details."""
Expand Down Expand Up @@ -179,15 +198,21 @@ def start_scan(self):
return

# Create and start a new ARP scan thread
self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup)
self.arp_scanner_thread = ARPScannerThread(
self.interface,
self.mac_vendor_lookup,
self.timeout/1000
)
self.arp_scanner_thread.partialResults.connect(self.handle_partial_results)
self.arp_scanner_thread.finished.connect(self.handle_scan_results)
self.arp_scanner_thread.progressChanged.connect(self.update_progress) # New connection
self.arp_scanner_thread.start()
print("Started ARP scan.")
print(f"Started ARP scan with timeout: {self.timeout}ms")

@Slot(list)
def handle_partial_results(self, partial_results):
for ip_address, mac, hostname, device_vendor, packet in partial_results:
"""Update partials"""
for ip_address, mac, hostname, device_vendor, packet in partial_results: # pylint: disable=unused-variable
self.add_device_to_list(ip_address, mac, hostname, device_vendor)

@Slot(list)
Expand Down Expand Up @@ -229,9 +254,11 @@ def quit_application(self):
self.arp_scanner_thread.wait()
self.close()

class ARPScannerThread(QThread):
class ARPScannerThread(QThread): # pylint: disable=too-few-public-methods
"""ARP scanner"""
finished = Signal(list) # Final results
partialResults = Signal(list) # Intermediate results
progressChanged = Signal(int) # New signal for progress (0-100%)

def __init__(self, interface, mac_vendor_lookup, timeout=1):
super().__init__()
Expand All @@ -247,10 +274,10 @@ def _scan_ip_native(self, src_ip, target_ip):
self.interface,
str(src_ip),
str(target_ip),
int(self.timeout * 300) # 300ms timeout per scan
int(self.timeout * 1000) # Convert to ms
)
return target_ip, result
except Exception as e:
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error scanning {target_ip}: {e}")
return target_ip, None

Expand All @@ -262,6 +289,7 @@ def _create_arp_response(self, ip_addr, mac):
})()

def run(self):
"""Run the ARP scan thread."""
src_ip = get_if_addr(self.interface)
try:
netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['netmask']
Expand All @@ -270,38 +298,61 @@ def run(self):
self.finished.emit([])
return

arp_results = []
network = ipaddress.IPv4Network(network_cidr)
arp_results = self._scan_network(src_ip, network)
self.finished.emit(arp_results)

def _scan_network(self, src_ip, network):
"""Scan the given network and return ARP results."""
hosts = [str(ip) for ip in network.hosts() if str(ip) != src_ip]
if self.use_native:
print("Using native ARP scanner")
network = ipaddress.IPv4Network(network_cidr)
count = 0
for ip in network.hosts():
if str(ip) == src_ip:
continue # Skip scanning our own IP
target_ip, result = self._scan_ip_native(src_ip, str(ip))
if result:
device_vendor = self.mac_vendor_lookup.lookup_vendor(result['mac'])
hostname = net.get_hostname(target_ip)
arp_response = self._create_arp_response(target_ip, result['mac'])
arp_results.append((target_ip, result['mac'], hostname, device_vendor, arp_response))
count += 1
# Every 10 IPs (or any chosen interval), emit partial results
if count % 10 == 0:
self.partialResults.emit(arp_results)
self.finished.emit(arp_results)
else:
print("Using Scapy ARP scanner")
return self._run_native_scan(src_ip, hosts)
print("Using Scapy ARP scanner with progress updates")
return self._run_scapy_scan(hosts)

def _run_native_scan(self, src_ip, hosts):
"""Perform native ARP scanning on a list of hosts."""
arp_results = []
total = len(hosts)
for count, ip in enumerate(hosts, start=1):
target_ip, result = self._scan_ip_native(src_ip, ip)
if result:
device_vendor = self.mac_vendor_lookup.lookup_vendor(result['mac'])
hostname = net.get_hostname(target_ip)
arp_response = self._create_arp_response(target_ip, result['mac'])
arp_results.append(target_ip,
result['mac'],
hostname,
device_vendor,
arp_response
)
self._update_progress(count, total, arp_results)
return arp_results

def _run_scapy_scan(self, hosts):
"""Perform Scapy ARP scanning on a list of hosts."""
arp_results = []
total = len(hosts)
for count, ip in enumerate(hosts, start=1):
try:
arp_packets = arping(network_cidr, timeout=self.timeout, verbose=1)[0]
except Exception as e:
print(f"Error during ARP scan: {e}")
self.finished.emit([])
return

for packet in arp_packets:
ip_addr = packet[1][ARP].psrc
mac = packet[1][ARP].hwsrc
device_vendor = self.mac_vendor_lookup.lookup_vendor(mac)
hostname = net.get_hostname(ip_addr)
arp_results.append((ip_addr, mac, hostname, device_vendor, packet[1][ARP]))
self.finished.emit(arp_results)
ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip),
timeout=self.timeout, verbose=0)
if ans:
for _, received in ans:
ip_addr = received.psrc
mac = received.hwsrc
device_vendor = self.mac_vendor_lookup.lookup_vendor(mac)
hostname = net.get_hostname(ip_addr)
arp_results.append((ip_addr, mac, hostname, device_vendor, received))
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error scanning {ip}: {e}")
self._update_progress(count, total, arp_results)
return arp_results

def _update_progress(self, count, total, arp_results):
"""Update progress and emit partial results every 10 hosts."""
progress = int((count / total) * 100)
self.progressChanged.emit(progress)
if count % 10 == 0:
self.partialResults.emit(arp_results)
Binary file modified images/phantom.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='ARP Sniffer')
parser.add_argument('--interface', required=True, help='Network interface name')
parser.add_argument('--timeout', type=int, default=1000, help='Timeout in milliseconds for ARP scan (default: 1000)')
args = parser.parse_args()

app = QApplication(sys.argv)

window = DeviceDiscoveryDialog(
args.interface,
oui_url="http://standards-oui.ieee.org/oui/oui.csv"
oui_url="http://standards-oui.ieee.org/oui/oui.csv",
timeout=args.timeout
)
window.show()

Expand Down
Loading