diff --git a/msal/__main__.py b/msal/__main__.py index 4107fd89..a28801eb 100644 --- a/msal/__main__.py +++ b/msal/__main__.py @@ -339,6 +339,8 @@ def _main(): logging.error("Invalid input: %s", e) except KeyboardInterrupt: # Useful for bailing out a stuck interactive flow print("Aborted") + except Exception as e: + logging.error("Error: %s", e) if __name__ == "__main__": _main() diff --git a/msal/application.py b/msal/application.py index 57e40980..afdb6dfa 100644 --- a/msal/application.py +++ b/msal/application.py @@ -920,6 +920,9 @@ def initiate_auth_code_flow( ): """Initiate an auth code flow. + .. deprecated:: + The response_mode parameter is deprecated and will be removed in a future version. + Later when the response reaches your redirect_uri, you can use :func:`~acquire_token_by_auth_code_flow()` to complete the authentication/authorization. @@ -960,18 +963,16 @@ def initiate_auth_code_flow( New in version 1.15. :param str response_mode: - OPTIONAL. Specifies the method with which response parameters should be returned. - The default value is equivalent to ``query``, which is still secure enough in MSAL Python - (because MSAL Python does not transfer tokens via query parameter in the first place). - For even better security, we recommend using the value ``form_post``. - In "form_post" mode, response parameters - will be encoded as HTML form values that are transmitted via the HTTP POST method and - encoded in the body using the application/x-www-form-urlencoded format. - Valid values can be either "form_post" for HTTP POST to callback URI or - "query" (the default) for HTTP GET with parameters encoded in query string. - More information on possible values - `here ` - and `here ` + .. deprecated:: + This parameter is deprecated and will be removed in a future version. + + **For PublicClientApplication**: response_mode is automatically set to + ``form_post`` for security reasons. This parameter is ignored. + + **For ConfidentialClientApplication**: You should configure your web + framework to accept form_post responses instead of query responses. + While this parameter still works, it will be removed in a future version. + Using query-based response modes is less secure and should be avoided. :return: The auth code flow. It is a dict in this form:: @@ -991,6 +992,15 @@ def initiate_auth_code_flow( 3. and then relay this dict and subsequent auth response to :func:`~acquire_token_by_auth_code_flow()`. """ + if response_mode is not None: + import warnings + warnings.warn( + "The 'response_mode' parameter is deprecated and will be removed in a future version. " + "For public clients, response_mode is automatically set to 'form_post'. " + "For confidential clients, configure your web framework to use 'form_post'. " + "Query-based response modes are less secure.", + DeprecationWarning, + stacklevel=2) client = _ClientWithCcsRoutingInfo( {"authorization_endpoint": self.authority.authorization_endpoint}, self.client_id, diff --git a/msal/oauth2cli/authcode.py b/msal/oauth2cli/authcode.py index ba266223..6c81bb8f 100644 --- a/msal/oauth2cli/authcode.py +++ b/msal/oauth2cli/authcode.py @@ -111,27 +111,70 @@ class _AuthCodeHandler(BaseHTTPRequestHandler): def do_GET(self): # For flexibility, we choose to not check self.path matching redirect_uri #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') + qs = parse_qs(urlparse(self.path).query) - if qs.get('code') or qs.get("error"): # So, it is an auth response - auth_response = _qs2kv(qs) - logger.debug("Got auth response: %s", auth_response) - if self.server.auth_state and self.server.auth_state != auth_response.get("state"): - # OAuth2 successful and error responses contain state when it was used - # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 - self._send_full_response("State mismatch") # Possibly an attack - else: - template = (self.server.success_template - if "code" in qs else self.server.error_template) - if _is_html(template.template): - safe_data = _escape(auth_response) # Foiling an XSS attack - else: - safe_data = auth_response - self._send_full_response(template.safe_substitute(**safe_data)) - self.server.auth_response = auth_response # Set it now, after the response is likely sent + if qs.get('code') or qs.get('error'): + # GET request with auth code or error - reject for security (form_post only) + self._send_full_response( + "GET method is not supported for authentication responses. " + "This application requires form_post response mode.", + is_ok=False) + elif not qs: + # Blank redirect from eSTS error - show generic error and mark done + self._send_full_response( + "Authentication could not be completed. " + "You can close this window and return to the application.") + self.server.done = True else: + # Other GET requests - show welcome page self._send_full_response(self.server.welcome_page) # NOTE: Don't do self.server.shutdown() here. It'll halt the server. + def do_POST(self): + # Handle form_post response mode where auth code is sent via POST body + content_length = int(self.headers.get('Content-Length', 0)) + post_data = self.rfile.read(content_length).decode('utf-8') + + qs = parse_qs(post_data) + if qs.get('code') or qs.get('error'): # So, it is an auth response + auth_response = _qs2kv(qs) + logger.debug("Got auth response via POST: %s", auth_response) + self._process_auth_response(auth_response) + else: + self._send_full_response("Invalid POST request", is_ok=False) + # NOTE: Don't do self.server.shutdown() here. It'll halt the server. + + def _process_auth_response(self, auth_response): + """Process the auth response from either GET or POST request.""" + if self.server.auth_state and self.server.auth_state != auth_response.get("state"): + # OAuth2 successful and error responses contain state when it was used + # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 + self._send_full_response("State mismatch") # Possibly an attack + # Don't set auth_response for security, but mark as done to avoid hanging + self.server.done = True + else: + template = (self.server.success_template + if "code" in auth_response else self.server.error_template) + if _is_html(template.template): + safe_data = _escape(auth_response) # Foiling an XSS attack + else: + safe_data = dict(auth_response) # Make a copy to avoid mutating original + # Provide default values for common OAuth2 response fields + # to avoid showing literal placeholder text like "$error_description" + safe_data.setdefault("error", "") + safe_data.setdefault("error_description", "") + # Format error message nicely: include ": description." only if description exists + if "code" not in auth_response: # This is an error response + error_desc = auth_response.get("error_description", "").strip() + if error_desc: + safe_data["error_message"] = f"{safe_data['error']}: {error_desc}." + else: + safe_data["error_message"] = safe_data["error"] + else: + safe_data["error_message"] = "" + self._send_full_response(template.safe_substitute(**safe_data)) + self.server.auth_response = auth_response # Set it now, after the response is likely sent + def _send_full_response(self, body, is_ok=True): self.send_response(200 if is_ok else 400) content_type = 'text/html' if _is_html(body) else 'text/plain' @@ -287,6 +330,15 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, welcome_uri = "http://localhost:{p}".format(p=self.get_port()) abort_uri = "{loc}?error=abort".format(loc=welcome_uri) logger.debug("Abort by visit %s", abort_uri) + + # Enforce response_mode=form_post for security + if auth_uri: + parsed = urlparse(auth_uri) + params = parse_qs(parsed.query) + params['response_mode'] = ['form_post'] # Enforce form_post + new_query = urlencode(params, doseq=True) + auth_uri = parsed._replace(query=new_query).geturl() + self._server.welcome_page = Template(welcome_template or "").safe_substitute( auth_uri=auth_uri, abort_uri=abort_uri) if auth_uri: # Now attempt to open a local browser to visit it @@ -317,19 +369,22 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, auth_uri_callback(_uri) self._server.success_template = Template(success_template or - "Authentication completed. You can close this window now.") + "Authentication complete. You can return to the application. Please close this browser tab.\n\n" + "For your security: Do not share the contents of this page, the address bar, or take screenshots.") self._server.error_template = Template(error_template or - "Authentication failed. $error: $error_description. ($error_uri)") + "Authentication failed. $error_message\n\n" + "For your security: Do not share the contents of this page, the address bar, or take screenshots.") self._server.timeout = timeout # Otherwise its handle_timeout() won't work self._server.auth_response = {} # Shared with _AuthCodeHandler self._server.auth_state = state # So handler will check it before sending response + self._server.done = False # Flag to indicate completion without setting auth_response while not self._closing: # Otherwise, the handle_request() attempt # would yield noisy ValueError trace # Derived from # https://docs.python.org/2/library/basehttpserver.html#more-examples self._server.handle_request() - if self._server.auth_response: + if self._server.auth_response or self._server.done: break result.update(self._server.auth_response) # Return via writable result param diff --git a/msal/oauth2cli/oauth2.py b/msal/oauth2cli/oauth2.py index ef32ceaa..1914f126 100644 --- a/msal/oauth2cli/oauth2.py +++ b/msal/oauth2cli/oauth2.py @@ -176,7 +176,22 @@ def _build_auth_request_params(self, response_type, **kwargs): response_type = self._stringify(response_type) params = {'client_id': self.client_id, 'response_type': response_type} - params.update(kwargs) # Note: None values will override params + params.update(kwargs) + if 'response_mode' in params: + import warnings + warnings.warn( + "The 'response_mode' parameter is deprecated and will be removed in a future version. " + "For public clients, response_mode is automatically set to 'form_post'. " + "For confidential clients, configure your web framework to use 'form_post'. " + "Query-based response modes are less secure.", + DeprecationWarning, + stacklevel=3) + if params['response_mode'] != 'form_post': + warnings.warn( + "response_mode='form_post' is recommended for better security. " + "See https://tools.ietf.org/html/rfc6749#section-4.1.2", + UserWarning, + stacklevel=3) params = {k: v for k, v in params.items() if v is not None} # clean up if params.get('scope'): params['scope'] = self._stringify(params['scope']) diff --git a/tests/test_authcode.py b/tests/test_authcode.py index 329cafd8..fd38c294 100644 --- a/tests/test_authcode.py +++ b/tests/test_authcode.py @@ -26,17 +26,159 @@ def test_no_two_concurrent_receivers_can_listen_on_same_port(self): pass def test_template_should_escape_input(self): + """Test that POST request with HTML in error is properly escaped""" with AuthCodeReceiver() as receiver: receiver._scheduled_actions = [( # Injection happens here when the port is known 1, # Delay it until the receiver is activated by get_auth_response() lambda: self.assertEqual( "<tag>foo</tag>", - requests.get("http://localhost:{}?error=foo".format( - receiver.get_port())).text, - "Unsafe data in HTML should be escaped", + requests.post( + "http://localhost:{}".format(receiver.get_port()), + data={"error": "foo"}, + headers={'Content-Type': 'application/x-www-form-urlencoded'} + ).text, ))] receiver.get_auth_response( # Starts server and hang until timeout timeout=3, error_template="$error", ) + def test_get_request_with_auth_code_is_rejected(self): + """Test that GET request with auth code is rejected for security""" + with AuthCodeReceiver() as receiver: + test_code = "test_auth_code_12345" + test_state = "test_state_67890" + receiver._scheduled_actions = [( + 1, + lambda: self._verify_get_rejection( + receiver.get_port(), + code=test_code, + state=test_state + ) + )] + result = receiver.get_auth_response( + timeout=3, + state=test_state, + ) + # Should not receive auth response via GET + self.assertIsNone(result) + + def _verify_get_rejection(self, port, **params): + """Helper to verify GET requests with auth codes are rejected""" + try: + from urllib.parse import urlencode + except ImportError: + from urllib import urlencode + + response = requests.get( + "http://localhost:{}?{}".format(port, urlencode(params)) + ) + # Verify error message about unsupported method + self.assertIn("not supported", response.text.lower()) + self.assertEqual(response.status_code, 400) + + def test_post_request_with_auth_code(self): + """Test that POST request with auth code is handled correctly (form_post response mode)""" + with AuthCodeReceiver() as receiver: + test_code = "test_auth_code_12345" + test_state = "test_state_67890" + receiver._scheduled_actions = [( + 1, + lambda: self._send_post_auth_response( + receiver.get_port(), + code=test_code, + state=test_state + ) + )] + result = receiver.get_auth_response( + timeout=3, + state=test_state, + success_template="Success: Got code $code", + ) + self.assertIsNotNone(result) + self.assertEqual(result.get("code"), test_code) + self.assertEqual(result.get("state"), test_state) + + def test_post_request_with_error(self): + """Test that POST request with error is handled correctly""" + with AuthCodeReceiver() as receiver: + test_error = "access_denied" + test_error_description = "User denied access" + receiver._scheduled_actions = [( + 1, + lambda: self._send_post_auth_response( + receiver.get_port(), + error=test_error, + error_description=test_error_description + ) + )] + result = receiver.get_auth_response( + timeout=3, + error_template="Error: $error - $error_description", + ) + self.assertIsNotNone(result) + self.assertEqual(result.get("error"), test_error) + self.assertEqual(result.get("error_description"), test_error_description) + + def test_post_request_state_mismatch(self): + """Test that POST request with mismatched state is rejected""" + with AuthCodeReceiver() as receiver: + test_code = "test_auth_code_12345" + wrong_state = "wrong_state" + expected_state = "expected_state" + receiver._scheduled_actions = [( + 1, + lambda: self._send_post_auth_response( + receiver.get_port(), + code=test_code, + state=wrong_state + ) + )] + result = receiver.get_auth_response( + timeout=3, + state=expected_state, + ) + # When state mismatches, the response should not be set + self.assertIsNone(result) + + def test_post_request_escapes_html(self): + """Test that POST request with HTML in error is properly escaped""" + with AuthCodeReceiver() as receiver: + malicious_error = "" + receiver._scheduled_actions = [( + 1, + lambda: self._verify_post_escaping(receiver.get_port(), malicious_error) + )] + receiver.get_auth_response( + timeout=3, + error_template="$error", + ) + + def _send_post_auth_response(self, port, **params): + """Helper to send POST request with auth response""" + try: + from urllib.parse import urlencode + except ImportError: + from urllib import urlencode + + response = requests.post( + "http://localhost:{}".format(port), + data=params, + headers={'Content-Type': 'application/x-www-form-urlencoded'} + ) + return response + + def _verify_post_escaping(self, port, malicious_error): + """Helper to verify HTML escaping in POST requests""" + response = self._send_post_auth_response(port, error=malicious_error) + # Verify that the malicious script is escaped + self.assertIn("<script>", response.text) + self.assertNotIn("