Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ venv/*
env/*

.vscode/*
.claude/

# ignore http server log
atests/http_server/http_server.log
95 changes: 95 additions & 0 deletions atests/http_server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
# See AUTHORS and LICENSE for more information

from flask import Flask, Response, jsonify as flask_jsonify, request
from flask_httpauth import HTTPBasicAuth, HTTPDigestAuth

from .structures import CaseInsensitiveDict
from .helpers import get_dict, status_code
from .utils import weighted_choice


app = Flask(__name__)
app.config['SECRET_KEY'] = 'test-secret-key-for-digest-auth'

# Initialize authentication handlers
basic_auth = HTTPBasicAuth()
digest_auth = HTTPDigestAuth()


def jsonify(*args, **kwargs):
Expand Down Expand Up @@ -191,3 +197,92 @@ def redirect_to():
response.headers["Location"] = args["url"]

return response


# Basic auth verification callback
@basic_auth.verify_password
def verify_basic_password(username, password):
# Get expected credentials from the request path
path_parts = request.path.split('/')
if len(path_parts) >= 4 and path_parts[1] == 'basic-auth':
expected_user = path_parts[2]
expected_pass = path_parts[3]
return username == expected_user and password == expected_pass
return False


@app.route("/basic-auth/<user>/<passwd>")
@basic_auth.login_required
def basic_auth_endpoint(user, passwd):
"""Prompts the user for authorization using HTTP Basic Auth.
---
tags:
- Auth
parameters:
- in: path
name: user
type: string
required: true
- in: path
name: passwd
type: string
required: true
produces:
- application/json
responses:
200:
description: Successful authentication.
401:
description: Unsuccessful authentication.
"""
return jsonify(authenticated=True, user=basic_auth.current_user())


# Digest auth password callback
@digest_auth.get_password
def get_digest_password(username):
# Get expected credentials from the request path
path_parts = request.path.split('/')
if len(path_parts) >= 5 and path_parts[1] == 'digest-auth':
expected_user = path_parts[3]
expected_pass = path_parts[4]
if username == expected_user:
return expected_pass
return None


@app.route("/digest-auth/<qop>/<user>/<passwd>")
@app.route("/digest-auth/<qop>/<user>/<passwd>/<algorithm>")
@digest_auth.login_required
def digest_auth_endpoint(qop, user, passwd, algorithm='MD5'):
"""Prompts the user for authorization using HTTP Digest Auth.
---
tags:
- Auth
parameters:
- in: path
name: qop
type: string
required: true
- in: path
name: user
type: string
required: true
- in: path
name: passwd
type: string
required: true
- in: path
name: algorithm
type: string
required: false
default: MD5
produces:
- application/json
responses:
200:
description: Successful authentication.
401:
description: Unsuccessful authentication.
"""
return jsonify(authenticated=True, user=digest_auth.current_user())
145 changes: 0 additions & 145 deletions atests/http_server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,126 +256,6 @@ def status_code(code):
return r


def check_basic_auth(user, passwd):
"""Checks user authentication using HTTP Basic Auth."""

auth = request.authorization
return auth and auth.username == user and auth.password == passwd


# Digest auth helpers
# qop is a quality of protection


def H(data, algorithm):
if algorithm == 'SHA-256':
return sha256(data).hexdigest()
elif algorithm == 'SHA-512':
return sha512(data).hexdigest()
else:
return md5(data).hexdigest()


def HA1(realm, username, password, algorithm):
"""Create HA1 hash by realm, username, password

HA1 = md5(A1) = MD5(username:realm:password)
"""
if not realm:
realm = u''
return H(b":".join([username.encode('utf-8'),
realm.encode('utf-8'),
password.encode('utf-8')]), algorithm)


def HA2(credentials, request, algorithm):
"""Create HA2 md5 hash

If the qop directive's value is "auth" or is unspecified, then HA2:
HA2 = md5(A2) = MD5(method:digestURI)
If the qop directive's value is "auth-int" , then HA2 is
HA2 = md5(A2) = MD5(method:digestURI:MD5(entityBody))
"""
if credentials.get("qop") == "auth" or credentials.get('qop') is None:
return H(b":".join([request['method'].encode('utf-8'), request['uri'].encode('utf-8')]), algorithm)
elif credentials.get("qop") == "auth-int":
for k in 'method', 'uri', 'body':
if k not in request:
raise ValueError("%s required" % k)
A2 = b":".join([request['method'].encode('utf-8'),
request['uri'].encode('utf-8'),
H(request['body'], algorithm).encode('utf-8')])
return H(A2, algorithm)
raise ValueError


def response(credentials, password, request):
"""Compile digest auth response

If the qop directive's value is "auth" or "auth-int" , then compute the response as follows:
RESPONSE = MD5(HA1:nonce:nonceCount:clienNonce:qop:HA2)
Else if the qop directive is unspecified, then compute the response as follows:
RESPONSE = MD5(HA1:nonce:HA2)

Arguments:
- `credentials`: credentials dict
- `password`: request user password
- `request`: request dict
"""
response = None
algorithm = credentials.get('algorithm')
HA1_value = HA1(
credentials.get('realm'),
credentials.get('username'),
password,
algorithm
)
HA2_value = HA2(credentials, request, algorithm)
if credentials.get('qop') is None:
response = H(b":".join([
HA1_value.encode('utf-8'),
credentials.get('nonce', '').encode('utf-8'),
HA2_value.encode('utf-8')
]), algorithm)
elif credentials.get('qop') == 'auth' or credentials.get('qop') == 'auth-int':
for k in 'nonce', 'nc', 'cnonce', 'qop':
if k not in credentials:
raise ValueError("%s required for response H" % k)
response = H(b":".join([HA1_value.encode('utf-8'),
credentials.get('nonce').encode('utf-8'),
credentials.get('nc').encode('utf-8'),
credentials.get('cnonce').encode('utf-8'),
credentials.get('qop').encode('utf-8'),
HA2_value.encode('utf-8')]), algorithm)
else:
raise ValueError("qop value are wrong")

return response


def check_digest_auth(user, passwd):
"""Check user authentication using HTTP Digest auth"""

if request.headers.get('Authorization'):
credentials = Authorization.from_header(request.headers.get('Authorization'))
if not credentials:
return
request_uri = request.script_root + request.path
if request.query_string:
request_uri += '?' + request.query_string
response_hash = response(credentials, passwd, dict(uri=request_uri,
body=request.data,
method=request.method))
if credentials.get('response') == response_hash:
return True
return False


def secure_cookie():
"""Return true if cookie should have secure attribute"""
return request.environ['wsgi.url_scheme'] == 'https'


def __parse_request_range(range_header_text):
""" Return a tuple describing the byte range requested in a GET request
If the range is open ended on the left or right side, then a value of None
Expand Down Expand Up @@ -453,28 +333,3 @@ def next_stale_after_value(stale_after):
return str(stal_after_count)
except ValueError:
return 'never'


def digest_challenge_response(app, qop, algorithm, stale=False):
response = app.make_response('')
response.status_code = 401

# RFC2616 Section4.2: HTTP headers are ASCII. That means
# request.remote_addr was originally ASCII, so I should be able to
# encode it back to ascii. Also, RFC2617 says about nonces: "The
# contents of the nonce are implementation dependent"
nonce = H(b''.join([
getattr(request, 'remote_addr', u'').encode('ascii'),
b':',
str(time.time()).encode('ascii'),
b':',
os.urandom(10)
]), algorithm)
opaque = H(os.urandom(10), algorithm)

auth = WWWAuthenticate("digest")
auth.set_digest('[email protected]', nonce, opaque=opaque,
qop=('auth', 'auth-int') if qop is None else (qop,), algorithm=algorithm)
auth.stale = stale
response.headers['WWW-Authenticate'] = auth.to_header()
return response
16 changes: 8 additions & 8 deletions atests/test_authentication.robot
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
*** Settings ***
Library RequestsLibrary
Library customAuthenticator.py
Resource res_setup.robot


*** Test Cases ***
Get With Auth
[Tags] get get-cert
${auth}= Create List user passwd
Create Session httpbin https://httpbin.org auth=${auth} verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /basic-auth/user/passwd
Create Session authsession ${HTTP_LOCAL_SERVER} auth=${auth}
${resp}= GET On Session authsession /basic-auth/user/passwd
Should Be Equal As Strings ${resp.status_code} 200
Should Be Equal As Strings ${resp.json()['authenticated']} True

Get With Custom Auth
[Tags] get
${auth}= Get Custom Auth user passwd
Create Custom Session httpbin https://httpbin.org auth=${auth} verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /basic-auth/user/passwd
Create Custom Session authsession ${HTTP_LOCAL_SERVER} auth=${auth}
${resp}= GET On Session authsession /basic-auth/user/passwd
Should Be Equal As Strings ${resp.status_code} 200
Should Be Equal As Strings ${resp.json()['authenticated']} True

Get With Digest Auth
[Tags] get get-cert
${auth}= Create List user pass
Create Digest Session
... httpbin
... https://httpbin.org
... authsession
... ${HTTP_LOCAL_SERVER}
... auth=${auth}
... debug=3
... verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /digest-auth/auth/user/pass
${resp}= GET On Session authsession /digest-auth/auth/user/pass
Should Be Equal As Strings ${resp.status_code} 200
Should Be Equal As Strings ${resp.json()['authenticated']} True
12 changes: 6 additions & 6 deletions atests/test_ssl_certs.robot
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ Library RequestsLibrary
*** Test Cases ***
Get HTTPS & Verify Cert
[Tags] get get-cert
Create Session httpbin https://httpbin.org verify=True
${resp}= GET On Session httpbin /get
Create Session sslsession https://github.com verify=True
${resp}= GET On Session sslsession /
Should Be Equal As Strings ${resp.status_code} 200

Get HTTPS & Verify Cert with a CA bundle
[Tags] get get-cert
Create Session httpbin https://httpbin.org verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /get
Create Session sslsession https://github.com verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session sslsession /
Should Be Equal As Strings ${resp.status_code} 200

Get HTTPS with Client Side Certificates
[Tags] get get-cert
@{client_certs}= Create List ${CURDIR}${/}clientcert.pem ${CURDIR}${/}clientkey.pem
Create Client Cert Session crtsession https://server.cryptomix.com/secure client_certs=@{client_certs}
${resp}= GET On Session crtsession /
Create Client Cert Session sslsession https://server.cryptomix.com/secure client_certs=@{client_certs}
${resp}= GET On Session sslsession /
Should Be Equal As Strings ${resp.status_code} 200
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Topic :: Software Development :: Testing
"""[1:-1]

TEST_REQUIRE = ['robotframework>=3.2.1', 'pytest', 'flask', 'six', 'coverage', 'flake8']
TEST_REQUIRE = ['robotframework>=3.2.1', 'pytest', 'flask', 'six', 'coverage', 'flake8', 'Flask-HTTPAuth==4.8.0']

VERSION = None
version_file = join(dirname(abspath(__file__)), 'src', 'RequestsLibrary', 'version.py')
Expand Down
Loading