Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/fedservice/entity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
from typing import Optional
from typing import Union

from cryptojwt import as_unicode
from cryptojwt import KeyJar
from cryptojwt.jws.jws import factory
from cryptojwt.utils import importer
from idpyoidc.client.client_auth import client_auth_setup
from idpyoidc.server.util import execute
Expand Down Expand Up @@ -246,10 +244,10 @@ def pick_from_stored_trust_chains(self, entity_id):
return _trust_chains[ta_id]
return _trust_chains[_tas[0]]

def get_payload(self, self_signed_statement):
_jws = as_unicode(self_signed_statement)
_jwt = factory(_jws)
return _jwt.jwt.payload()
# def get_payload(self, self_signed_statement):
# _jws = as_unicode(self_signed_statement)
# _jwt = factory(_jws)
# return _jwt.jwt.payload()

def supported(self):
_supports = self.context.supports()
Expand Down
7 changes: 7 additions & 0 deletions src/fedservice/entity/function/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ def verify_self_signed_signature(token):
return _val


def verify_signature(token, jwks, iss):
_keyjar = KeyJar()
_keyjar = import_jwks(_keyjar, jwks, iss)
_jwt = JWT(key_jar=_keyjar)
return _jwt.unpack(token)


def tree2chains(unit):
res = []
for issuer, branch in unit.items():
Expand Down
84 changes: 68 additions & 16 deletions src/fedservice/entity/function/trust_mark_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,69 @@
from typing import Optional

from cryptojwt import KeyJar
from cryptojwt.exception import Expired
from cryptojwt.jws.jws import factory
from idpyoidc.key_import import import_jwks
from idpyoidc.message import Message

from fedservice import message
from fedservice.entity import apply_policies
from fedservice.entity.function import Function
from fedservice.entity.function import get_payload
from fedservice.entity.function import get_verified_trust_chains
from fedservice.entity.function import verify_signature
from fedservice.entity.function.trust_anchor import get_verified_trust_anchor_statement
from fedservice.entity.utils import get_federation_entity
from fedservice.utils import statement_is_expired

logger = logging.getLogger(__name__)


class TrustMarkVerifier(Function):

"""
The steps are:
1) Verify the trust mark itself. That is; that it contains all the required claims and has not expired.
2) Check that the trust mark issuer is recognized by the trust anchor
3) If delegation is active.
a) verify that the delegator is recognized by the trust anchor
b) verify the signature of the delegation
4) Find a trust chain to the trust mark issuer
5) Verify the signature of the trust mark
"""
def __init__(self, upstream_get: Callable):
Function.__init__(self, upstream_get)

def check_delegation(self, trust_anchor_statement, trust_mark) -> bool:
_owners = trust_anchor_statement.get("trust_mark_owners", {})
if _owners:
_delegator = _owners.get(trust_mark["trust_mark_id"])
else:
_delegator = None

if "delegation" in trust_mark:
# object with two parameters 'sub' and 'jwks'
if _delegator["sub"] != trust_mark["__delegation"]["iss"]:
logger.warning(
f"{trust_mark['__delegation']['iss']} not recognized delegator for {trust_mark['trust_mark_id']}")
return False
try:
_token = verify_signature(trust_mark["delegation"], _delegator["jwks"], _delegator["sub"])
except Exception as err:
logger.exception("Verify delegation signature failed")
return False

# Might want to put _token in trust_mark[verified_claim_name("delegation")]
else:
if _delegator:
return False

return True

def __call__(self,
trust_mark: str,
trust_anchor: str,
check_status: Optional[bool] = False,
entity_id: Optional[str] = '',
):
) -> Optional[Message]:
"""
Verifies that a trust mark is issued by someone in the federation and that
the signing key is a federation key.
Expand All @@ -39,22 +77,37 @@ def __call__(self,
payload = get_payload(trust_mark)
_trust_mark = message.TrustMark(**payload)
# Verify that everything that should be there, are there
_trust_mark.verify()
try:
_trust_mark.verify()
except Expired: # Has it expired ?
return None
except ValueError: # Not correct delegation ?
raise

# Has it expired ?
if statement_is_expired(_trust_mark):
# Get trust anchor information in order to verify the issuer and if needed the delegator.
_federation_entity = get_federation_entity(self)
trust_anchor_statement = get_verified_trust_anchor_statement(_federation_entity, trust_anchor)

# Check delegation
if self.check_delegation(trust_anchor_statement, _trust_mark) == False:
return None

# deal with delegation
if 'delegation' in _trust_mark:
_delegation = self.verify_delegation(_trust_mark, trust_anchor)
if not _delegation:
logger.warning("Could not verify the delegation")
# Trust mark issuers recognized by the trust anchor
_trust_mark_issuers = trust_anchor_statement.get("trust_mark_issuers")
if _trust_mark_issuers is None: # No trust mark issuers are recognized by the trust anchor
return None
_allowed_issuers = _trust_mark_issuers.get(_trust_mark['trust_mark_id'])
if _allowed_issuers is None:
return None

# Get trust chain
# _federation_entity = get_federation_entity(self)
# _chains, entity_conf = collect_trust_chains(_federation_entity, _trust_mark['iss'])
# _trust_chains = verify_trust_chains(_federation_entity, _chains, entity_conf)
if _allowed_issuers == [] or _trust_mark["iss"] in _allowed_issuers:
pass
else: # The trust mark issuer not trusted by the trust anchor
logger.warning(
f'Trust mark issuer {_trust_mark["iss"]} not trusted by the trust anchor for trust mar id: {_trust_mark["trust_mark_id"]}')
return None

# Now time to verify the signature of the trust mark
_trust_chains = get_verified_trust_chains(self, _trust_mark['iss'])
if not _trust_chains:
logger.warning(f"Could not find any verifiable trust chains for {_trust_mark['iss']}")
Expand All @@ -66,7 +119,6 @@ def __call__(self,

# Now try to verify the signature on the trust_mark
# should have the necessary keys
_federation_entity = get_federation_entity(self)
_jwt = factory(trust_mark)
keyjar = _federation_entity.get_attribute('keyjar')

Expand Down
29 changes: 23 additions & 6 deletions src/fedservice/entity/server/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,42 @@ def process_request(self, request=None, **kwargs):
signed_entity_configuration)
_trust_chains = apply_policies(_federation_entity, _trust_chains)

_chain = None
_chosen_chain = None
for trust_chain in _trust_chains:
if _trust_anchor == trust_chain.anchor:
_chain = trust_chain
_chosen_chain = trust_chain
break

if "type" in request:
metadata = {request['type']: _chain.metadata[request['type']]}
metadata = {request['type']: _chosen_chain.metadata[request['type']]}
else:
metadata = _chain.metadata
metadata = _chosen_chain.metadata

# Now for the trust marks
verified_trust_marks = []
for _trust_mark in _chosen_chain.verified_chain[-1].get("trust_marks", []):
_verified_mark = _federation_entity.function.trust_mark_verifier(trust_mark=_trust_mark,
trust_anchor=_trust_anchor)
if _verified_mark:
verified_trust_marks.append({
"trust_mark_id":_verified_mark["trust_mark_id"],
"trust_mark": _trust_mark
})

trust_chain = _federation_entity.function.trust_chain_collector.get_chain(
_chain.iss_path, _trust_anchor, kwargs.get("with_ta_ec"))
_chosen_chain.iss_path, _trust_anchor, kwargs.get("with_ta_ec"))

if verified_trust_marks:
args = {"trust_marks": verified_trust_marks}
else:
args = {}

_jws = create_entity_statement(_federation_entity.entity_id,
sub=request["sub"],
key_jar=_federation_entity.get_attribute('keyjar'),
metadata=metadata,
trust_chain=trust_chain)
trust_chain=trust_chain,
**args)
return {'response_args': _jws}

def response_info(
Expand Down
101 changes: 99 additions & 2 deletions src/fedservice/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
""" Classes and functions used to describe information in an OpenID Connect Federation."""
import json
import logging
from urllib.parse import parse_qs

from cryptojwt.exception import Expired
from cryptojwt.jws.jws import factory
Expand All @@ -12,6 +14,7 @@
from idpyoidc.message import OPTIONAL_LIST_OF_STRINGS
from idpyoidc.message import OPTIONAL_MESSAGE
from idpyoidc.message import REQUIRED_LIST_OF_STRINGS
from idpyoidc.message import ser_any_list
from idpyoidc.message import SINGLE_OPTIONAL_ANY
from idpyoidc.message import SINGLE_OPTIONAL_INT
from idpyoidc.message import SINGLE_OPTIONAL_JSON
Expand All @@ -30,6 +33,7 @@
from idpyoidc.message.oidc import SINGLE_OPTIONAL_BOOLEAN
from idpyoidc.message.oidc import SINGLE_OPTIONAL_DICT

from fedservice.entity.function import get_payload
from fedservice.exception import UnknownCriticalExtension
from fedservice.exception import WrongSubject

Expand All @@ -38,6 +42,33 @@
LOGGER = logging.getLogger(__name__)


def dict_list_deser(val, sformat="dict"):
res = []
if isinstance(val, list):
for v in val:
if isinstance(v, str):
if sformat == "urlencoded":
res.append(parse_qs(v))
else:
res.append(json.loads(v))
elif isinstance(v, dict):
res.append(v)
else:
if isinstance(val, str):
if sformat == "urlencoded":
res = [parse_qs(val)]
else:
res = [json.loads(val)]
elif isinstance(val, dict):
res = [val]

return res


REQUIRED_LIST_OF_DICT = ([dict], True, ser_any_list, dict_list_deser, False)
OPTIONAL_LIST_OF_DICT = ([dict], False, ser_any_list, dict_list_deser, False)


class AuthorizationServerMetadata(Message):
"""Metadata for an OAuth2 Authorization Server. With Federation additions"""
c_param = {
Expand Down Expand Up @@ -94,6 +125,7 @@ def naming_constraints_deser(val, sformat="json"):

SINGLE_OPTIONAL_NAMING_CONSTRAINTS = (Message, False, msg_ser, naming_constraints_deser, False)


class InformationalMetadataExtensions(Message):
c_param = {
"organization_name": SINGLE_OPTIONAL_STRING,
Expand All @@ -103,6 +135,7 @@ class InformationalMetadataExtensions(Message):
"homepage_uri": SINGLE_OPTIONAL_STRING,
}


class FederationEntity(InformationalMetadataExtensions):
"""Class representing Federation Entity metadata."""
c_param = InformationalMetadataExtensions.c_param.copy()
Expand Down Expand Up @@ -467,7 +500,7 @@ class EntityStatement(JsonWebToken):
"crit": OPTIONAL_LIST_OF_STRINGS,
"policy_language_crit": OPTIONAL_LIST_OF_STRINGS,
"source_endpoint": SINGLE_OPTIONAL_STRING,
'trust_marks': SINGLE_OPTIONAL_JSON,
'trust_marks': OPTIONAL_LIST_OF_DICT,
'trust_mark_owners': SINGLE_OPTIONAL_JSON,
'trust_mark_issuers': SINGLE_OPTIONAL_JSON,
#
Expand Down Expand Up @@ -509,6 +542,46 @@ def verify(self, **kwargs):
_tmi = TrustMarkOwners(**_trust_mark_owners)
_tmi.verify()

# This does not verify the signature of the trust marks
# It only checks that the necessary claims are present
_trust_marks = self.get("trust_marks")
if _trust_marks:
for _tm in _trust_marks:
_trust_mark = None
if isinstance(_tm["trust_mark"], str):
_payload = get_payload(_tm["trust_mark"])
if _payload["trust_mark_id"] != _tm["trust_mark_id"]:
raise ValueError("trust_mark_is values does not match")
_trust_mark = TrustMark(**_payload)
elif isinstance(_tm["trust_mark"], dict):
if _tm["trust_mark"]["trust_mark_id"] != _tm["trust_mark_id"]:
raise ValueError("trust_mark_is values does not match")
_trust_mark = TrustMark(**_tm["trust_mark"])
else:
raise ValueError("Trust mark has a format I didn't expect")

_trust_mark.verify()


class TrustMarkDelegation(Message):
c_param = {
"iss": SINGLE_REQUIRED_STRING,
"sub": SINGLE_REQUIRED_STRING,
"trust_mark_id": SINGLE_REQUIRED_STRING,
"iat": SINGLE_REQUIRED_INT,
"exp": SINGLE_OPTIONAL_INT,
"ref": SINGLE_OPTIONAL_STRING
}

def verify(self, **kwargs):
super(TrustMarkDelegation, self).verify(**kwargs)

exp = self.get("exp", 0)
if exp:
_now = utc_time_sans_frac()
if _now > exp: # have passed the time of expiration
raise Expired()


class TrustMark(JsonWebToken):
c_param = JsonWebToken.c_param.copy()
Expand All @@ -531,12 +604,23 @@ def verify(self, **kwargs):
if entity_id is not None and entity_id != self["sub"]:
raise WrongSubject("Mismatch between subject in trust mark and entity_id of entity")

exp = kwargs.get("exp", 0)
exp = self.get("exp", 0)
if exp:
_now = utc_time_sans_frac()
if _now > exp: # have passed the time of expiration
raise Expired()

_delegation_jwt = self.get("delegation")
if _delegation_jwt:
# Not verifying the signature
_delegation = TrustMarkDelegation(**get_payload(_delegation_jwt))
_delegation.verify()
if self.get("iss") != _delegation["sub"]:
raise ValueError("Not the issuer the delegation applies to")
if self.get("trust_mark_id") != _delegation["trust_mark_id"]:
raise ValueError("Not the trust mark id the delegation applies to")
self["__delegation"] = _delegation

return True


Expand Down Expand Up @@ -673,3 +757,16 @@ class WhoResponse(Message):
c_param = {
"entities_to_use": REQUIRED_LIST_OF_STRINGS
}


class JWKSet(Message):
c_param = {
'keys': REQUIRED_LIST_OF_DICT,
"iss": SINGLE_REQUIRED_STRING,
"sub": SINGLE_REQUIRED_STRING,
"exp": SINGLE_OPTIONAL_INT,
"iat": SINGLE_OPTIONAL_INT,
# below should NOT be used
"nbf": SINGLE_OPTIONAL_INT,
"jti": SINGLE_OPTIONAL_STRING,
}
Loading