Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,10 @@
Sink module for `socket`
"""

from aikido_zen.context import get_current_context
from aikido_zen.helpers.get_argument import get_argument
from aikido_zen.sinks import on_import, patch_function, after
from aikido_zen.sinks.socket.report_and_check_hostname import report_and_check_hostname
from aikido_zen.vulnerabilities import run_vulnerability_scan
from aikido_zen.thread.thread_cache import get_cache
from aikido_zen.errors import AikidoSSRF


def report_and_check_hostname(hostname, port):
cache = get_cache()
if not cache:
return

cache.hostnames.add(hostname, port)

context = get_current_context()
is_bypassed = context and cache.is_bypassed_ip(context.remote_address)

if cache.config and not is_bypassed:
if cache.config.should_block_outgoing_request(hostname):
raise AikidoSSRF(f"Zen has blocked an outbound connection to {hostname}")


@after
Expand Down
14 changes: 14 additions & 0 deletions aikido_zen/sinks/socket/normalize_hostname.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
def normalize_hostname(hostname):
if not hostname or not isinstance(hostname, str):
return hostname

result = hostname
try:
# Check if hostname contains punycode (starts with xn--)
if hostname.startswith("xn--"):
result = hostname.encode("ascii").decode("idna")

return result
except (UnicodeError, LookupError):
# If decoding fails, return original hostname
return hostname
100 changes: 100 additions & 0 deletions aikido_zen/sinks/socket/normalize_hostname_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Test module for normalize_hostname function
"""

import pytest
from aikido_zen.sinks.socket.normalize_hostname import normalize_hostname


def test_normalize_hostname_none():
"""Test that None input returns None"""
assert normalize_hostname(None) is None


def test_normalize_hostname_empty_string():
"""Test that empty string returns empty string"""
assert normalize_hostname("") == ""


def test_normalize_hostname_regular_hostname():
"""Test that regular hostnames are returned unchanged"""
assert normalize_hostname("example.com") == "example.com"
assert normalize_hostname("subdomain.example.com") == "subdomain.example.com"
assert normalize_hostname("localhost") == "localhost"


def test_normalize_hostname_ip_address():
"""Test that IP addresses are returned unchanged"""
assert normalize_hostname("127.0.0.1") == "127.0.0.1"
assert normalize_hostname("::1") == "::1"
assert normalize_hostname("192.168.1.1") == "192.168.1.1"


def test_normalize_hostname_punycode_basic():
"""Test basic punycode conversion"""
# xn--test-5qa.com should decode to test.com (but test.com is ASCII, so this won't work)
# Let's use a realistic example instead
assert normalize_hostname("xn--mller-kva.example") == "müller.example"


def test_normalize_hostname_punycode_unicode():
"""Test punycode with unicode characters"""
# xn--mller-kva.example should decode to müller.example
result = normalize_hostname("xn--mller-kva.example")
assert result == "müller.example"

# xn--caf-dma.example should decode to café.example
result = normalize_hostname("xn--caf-dma.example")
assert result == "café.example"


def test_normalize_hostname_punycode_subdomain():
"""Test punycode in subdomains"""
# xn--mller-kva.example.com should decode to müller.example.com
result = normalize_hostname("xn--mller-kva.example.com")
assert result == "müller.example.com"


def test_normalize_hostname_mixed_case():
"""Test that case is preserved in non-punycode hostnames"""
assert normalize_hostname("Example.COM") == "Example.COM"
assert normalize_hostname("MixedCase.Example.com") == "MixedCase.Example.com"


def test_normalize_hostname_non_string_input():
"""Test that non-string inputs are returned unchanged"""
assert normalize_hostname(123) == 123
assert normalize_hostname([]) == []
assert normalize_hostname({}) == {}


def test_normalize_hostname_punycode_with_port():
"""Test that punycode hostnames with ports are handled correctly"""
# This should only normalize the hostname part, not the port
result = normalize_hostname("xn--mller-kva.example:8080")
assert result == "müller.example:8080"


def test_normalize_hostname_complex_punycode():
"""Test complex punycode examples"""
# Chinese characters: xn--fiqs8s.example should decode to 中国.example
result = normalize_hostname("xn--fiqs8s.example")
assert result == "中国.example"

# Japanese characters: xn--eckwd4c7cu47r2wf.example should decode to ドメイン名例.example
result = normalize_hostname("xn--eckwd4c7cu47r2wf.example")
assert result == "ドメイン名例.example"


def test_normalize_hostname_punycode_not_starting_with_xn():
"""Test that strings containing xn-- but not starting with it are unchanged"""
assert normalize_hostname("example.xn--test.com") == "example.xn--test.com"
assert normalize_hostname("sub.xn--domain.com") == "sub.xn--domain.com"


def test_normalize_hostname_punycode_error_handling():
"""Test error handling for malformed punycode"""
# This should return the original string if decoding fails
result = normalize_hostname("xn--invalid-punycode")
# Should either return the original or a decoded version if valid
assert isinstance(result, str)
20 changes: 20 additions & 0 deletions aikido_zen/sinks/socket/report_and_check_hostname.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from aikido_zen.context import get_current_context
from aikido_zen.errors import AikidoSSRF
from aikido_zen.sinks.socket.normalize_hostname import normalize_hostname
from aikido_zen.thread.thread_cache import get_cache


def report_and_check_hostname(hostname, port):
cache = get_cache()
if not cache:
return

hostname = normalize_hostname(hostname)
cache.hostnames.add(hostname, port)

context = get_current_context()
is_bypassed = context and cache.is_bypassed_ip(context.remote_address)

if cache.config and not is_bypassed:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested condition around cache.config and is_bypassed increases indentation; invert the condition or add an early return when config is absent or request is bypassed to flatten control flow.

Details

✨ AI Reasoning
​report_and_check_hostname includes a nested conditional
(if cache.config and not is_bypassed: then if cache.config.should_block_outgoing_request(hostname): raise). This creates two levels of nesting for the blocking decision. Inverting the outer condition or using an early return when config is missing or bypassed would flatten control flow, improving readability and maintainability without changing behavior.

🔧 How do I fix it?
Place parameter validation and guard clauses at the function start. Use early returns to reduce nesting levels and improve readability.

More info - Comment @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.

if cache.config.should_block_outgoing_request(hostname):
raise AikidoSSRF(f"Zen has blocked an outbound connection to {hostname}")
27 changes: 26 additions & 1 deletion aikido_zen/sinks/tests/socket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_socket_getaddrinfo_block_all_new_requests():
def test_socket_getaddrinfo_no_cache():
"""Test that getaddrinfo works normally when cache is not available"""
# Mock get_cache to return None
with patch("aikido_zen.sinks.socket.get_cache", return_value=None):
with patch("aikido_zen.thread.thread_cache.get_cache", return_value=None):
# Test that allowed domain doesn't throw an error when cache is unavailable
try:
socket.getaddrinfo("localhost", 80)
Expand Down Expand Up @@ -255,3 +255,28 @@ def test_socket_getaddrinfo_ip_address_as_hostname():
assert hostnames[0]["hostname"] == "8.8.8.8"
assert hostnames[0]["port"] == 53
assert hostnames[0]["hits"] == 1


def test_punycode_normalization():
# Reset cache and set up blocking
cache = get_cache()
cache.reset()
cache.config.update_domains(
[
{"hostname": "ssrf-rédirects.testssandbox.com", "mode": "block"},
]
)

with pytest.raises(Exception) as exc_info:
socket.getaddrinfo("xn--ssrf-rdirects-ghb.testssandbox.com", 80)
assert (
"Zen has blocked an outbound connection to ssrf-rédirects.testssandbox.com"
in str(exc_info.value)
)

with pytest.raises(Exception) as exc_info:
socket.getaddrinfo("ssrf-rédirects.testssandbox.com", 80)
assert (
"Zen has blocked an outbound connection to ssrf-rédirects.testssandbox.com"
in str(exc_info.value)
)
Loading