11#!/usr/bin/env python
22from __future__ import annotations
33
4+ from collections import defaultdict
45from dataclasses import dataclass
56from datetime import datetime , timedelta , timezone
67from enum import Enum , unique
1112from cryptography import x509
1213from cryptography .hazmat ._oid import ExtensionOID
1314from cryptography .hazmat .backends import default_backend
15+ from cryptography .hazmat .primitives import serialization
1416from cryptography .hazmat .primitives .asymmetric import ec , padding , rsa
1517from OpenSSL .SSL import Connection as SSLConnection
1618
@@ -53,15 +55,15 @@ class CRLConfig:
5355 CertRevocationCheckMode .DISABLED
5456 )
5557 allow_certificates_without_crl_url : bool = False
56- connection_timeout_ms : int = 3000
57- read_timeout_ms : int = 3000
58+ connection_timeout_ms : int = 5000
59+ read_timeout_ms : int = 5000 # 5s
5860 cache_validity_time : timedelta = timedelta (hours = 24 )
5961 enable_crl_cache : bool = True
6062 enable_crl_file_cache : bool = True
6163 crl_cache_dir : Path | str | None = None
6264 crl_cache_removal_delay_days : int = 7
6365 crl_cache_cleanup_interval_hours : int = 1
64- crl_cache_start_cleanup : bool = False
66+ crl_cache_start_cleanup : bool = True
6567
6668 @classmethod
6769 def from_connection (cls , sf_connection ) -> CRLConfig :
@@ -176,6 +178,7 @@ class CRLValidator:
176178 def __init__ (
177179 self ,
178180 session_manager : SessionManager | Any ,
181+ trusted_certificates : list [x509 .Certificate ],
179182 cert_revocation_check_mode : CertRevocationCheckMode = CRLConfig .cert_revocation_check_mode ,
180183 allow_certificates_without_crl_url : bool = CRLConfig .allow_certificates_without_crl_url ,
181184 connection_timeout_ms : int = CRLConfig .connection_timeout_ms ,
@@ -191,9 +194,22 @@ def __init__(
191194 self ._cache_validity_time = cache_validity_time
192195 self ._cache_manager = cache_manager or CRLCacheManager .noop ()
193196
197+ # list of trusted CA and their certificates
198+ self ._trusted_ca : dict [x509 .Name , list [x509 .Certificate ]] = defaultdict (list )
199+ for cert in trusted_certificates :
200+ self ._trusted_ca [cert .subject ].append (cert )
201+
202+ # declaration of validate_certificate_is_not_revoked function cache
203+ self ._cache_for__validate_certificate_is_not_revoked : dict [
204+ x509 .Certificate , CRLValidationResult
205+ ] = {}
206+
194207 @classmethod
195208 def from_config (
196- cls , config : CRLConfig , session_manager : SessionManager
209+ cls ,
210+ config : CRLConfig ,
211+ session_manager : SessionManager ,
212+ trusted_certificates : list [x509 .Certificate ],
197213 ) -> CRLValidator :
198214 """
199215 Create a CRLValidator instance from a CRLConfig.
@@ -204,6 +220,7 @@ def from_config(
204220 Args:
205221 config: CRLConfig instance containing CRL-related parameters
206222 session_manager: SessionManager instance
223+ trusted_certificates: List of trusted CA certificates
207224
208225 Returns:
209226 CRLValidator: Configured CRLValidator instance
@@ -244,6 +261,7 @@ def from_config(
244261
245262 return cls (
246263 session_manager = session_manager ,
264+ trusted_certificates = trusted_certificates ,
247265 cert_revocation_check_mode = config .cert_revocation_check_mode ,
248266 allow_certificates_without_crl_url = config .allow_certificates_without_crl_url ,
249267 connection_timeout_ms = config .connection_timeout_ms ,
@@ -272,9 +290,7 @@ def validate_certificate_chains(
272290
273291 if certificate_chains is None or len (certificate_chains ) == 0 :
274292 logger .warning ("Certificate chains are empty" )
275- if self ._cert_revocation_check_mode == CertRevocationCheckMode .ADVISORY :
276- return True
277- return False
293+ return self ._cert_revocation_check_mode == CertRevocationCheckMode .ADVISORY
278294
279295 results = []
280296 for chain in certificate_chains :
@@ -294,24 +310,133 @@ def validate_certificate_chains(
294310 def _validate_single_chain (
295311 self , chain : list [x509 .Certificate ]
296312 ) -> CRLValidationResult :
297- """Validate a single certificate chain"""
313+ """
314+ Returns:
315+ UNREVOKED: If there is a path to any trusted certificate where all certificates are unrevoked.
316+ REVOKED: If all paths to trusted certificates are revoked.
317+ ERROR: If there is a path to any trusted certificate on which none certificate is revoked,
318+ but some certificates can't be verified.
319+ """
298320 # An empty chain is considered an error
299321 if len (chain ) == 0 :
300322 return CRLValidationResult .ERROR
301- # the last certificate of the chain is considered the root and isn't validated
302- results = []
303- for i in range (len (chain ) - 1 ):
304- result = self ._validate_certificate (chain [i ], chain [i + 1 ])
305- if result == CRLValidationResult .REVOKED :
306- return CRLValidationResult .REVOKED
307- results .append (result )
308323
309- if CRLValidationResult .ERROR in results :
324+ subject_certificates : dict [x509 .Name , list [x509 .Certificate ]] = defaultdict (
325+ list
326+ )
327+ for cert in chain :
328+ subject_certificates [cert .subject ].append (cert )
329+ currently_visited_subjects : set [x509 .Name ] = set ()
330+
331+ def traverse_chain (cert : x509 .Certificate ) -> CRLValidationResult | None :
332+ # UNREVOKED - unrevoked path to a trusted certificate found
333+ # REVOKED - all paths are revoked
334+ # ERROR - some certificates on potentially unrevoked paths can't be verified, or no path to a trusted CA is detected
335+ # None - ignore this path (cycle detected)
336+ if self ._is_certificate_trusted_by_os (cert ):
337+ logger .debug ("Found trusted certificate: %s" , cert .subject )
338+ return CRLValidationResult .UNREVOKED
339+
340+ if trusted_ca_issuer := self ._get_trusted_ca_issuer (cert ):
341+ logger .debug ("Certificate signed by trusted CA: %s" , cert .subject )
342+ return self ._validate_certificate_is_not_revoked_with_cache (
343+ cert , trusted_ca_issuer
344+ )
345+
346+ if cert .issuer in currently_visited_subjects :
347+ # cycle detected - invalid path
348+ return None
349+
350+ valid_results : list [tuple [CRLValidationResult , x509 .Certificate ]] = []
351+ for ca_cert in subject_certificates [cert .issuer ]:
352+ if not self ._verify_certificate_signature (cert , ca_cert ):
353+ logger .debug (
354+ "Certificate signature verification failed for %s, looking for other paths" ,
355+ cert ,
356+ )
357+ continue
358+
359+ currently_visited_subjects .add (cert .issuer )
360+ ca_result = traverse_chain (ca_cert )
361+ currently_visited_subjects .remove (cert .issuer )
362+ if ca_result is None :
363+ # ignore invalid path result
364+ continue
365+ if ca_result == CRLValidationResult .UNREVOKED :
366+ # good path found
367+ return self ._validate_certificate_is_not_revoked_with_cache (
368+ cert , ca_cert
369+ )
370+ valid_results .append ((ca_result , ca_cert ))
371+
372+ if len (valid_results ) == 0 :
373+ # "root" certificate not cought by "is_trusted_by_os" check
374+ logger .debug ("No path towards trusted anchor: %s" , cert .subject )
375+ return CRLValidationResult .ERROR
376+
377+ # check if there exists an ERROR path
378+ for ca_result , ca_cert in valid_results :
379+ if ca_result == CRLValidationResult .ERROR :
380+ cert_result = self ._validate_certificate_is_not_revoked_with_cache (
381+ cert , ca_cert
382+ )
383+ if cert_result == CRLValidationResult .REVOKED :
384+ return CRLValidationResult .REVOKED
385+ return CRLValidationResult .ERROR
386+
387+ # no ERROR result found, all paths are REVOKED
388+ return CRLValidationResult .REVOKED
389+
390+ currently_visited_subjects .add (chain [0 ].subject )
391+ error_result = False
392+ revoked_result = False
393+ for cert in subject_certificates [chain [0 ].subject ]:
394+ result = traverse_chain (cert )
395+ if result == CRLValidationResult .UNREVOKED :
396+ return result
397+ error_result |= result == CRLValidationResult .ERROR
398+ revoked_result |= result == CRLValidationResult .REVOKED
399+
400+ if error_result or not revoked_result :
310401 return CRLValidationResult .ERROR
402+ return CRLValidationResult .REVOKED
311403
312- return CRLValidationResult .UNREVOKED
404+ def _is_certificate_trusted_by_os (self , cert : x509 .Certificate ) -> bool :
405+ if cert .subject not in self ._trusted_ca :
406+ return False
407+
408+ cert_der = cert .public_bytes (serialization .Encoding .DER )
409+ return any (
410+ cert_der == trusted_cert .public_bytes (serialization .Encoding .DER )
411+ for trusted_cert in self ._trusted_ca [cert .subject ]
412+ )
313413
314- def _validate_certificate (
414+ def _get_trusted_ca_issuer (self , cert : x509 .Certificate ) -> x509 .Certificate | None :
415+ for trusted_cert in self ._trusted_ca [cert .issuer ]:
416+ if self ._verify_certificate_signature (cert , trusted_cert ):
417+ return trusted_cert
418+ return None
419+
420+ def _verify_certificate_signature (
421+ self , cert : x509 .Certificate , ca_cert : x509 .Certificate
422+ ) -> bool :
423+ try :
424+ cert .verify_directly_issued_by (ca_cert )
425+ return True
426+ except Exception :
427+ return False
428+
429+ def _validate_certificate_is_not_revoked_with_cache (
430+ self , cert : x509 .Certificate , ca_cert : x509 .Certificate
431+ ) -> CRLValidationResult :
432+ # validate certificate can be called multiple times with the same certificate
433+ if cert not in self ._cache_for__validate_certificate_is_not_revoked :
434+ self ._cache_for__validate_certificate_is_not_revoked [cert ] = (
435+ self ._validate_certificate_is_not_revoked (cert , ca_cert )
436+ )
437+ return self ._cache_for__validate_certificate_is_not_revoked [cert ]
438+
439+ def _validate_certificate_is_not_revoked (
315440 self , cert : x509 .Certificate , ca_cert : x509 .Certificate
316441 ) -> CRLValidationResult :
317442 """Validate a single certificate against CRL"""
@@ -343,14 +468,29 @@ def _validate_certificate(
343468
344469 @staticmethod
345470 def _is_short_lived_certificate (cert : x509 .Certificate ) -> bool :
346- """Check if certificate is short-lived (validity <= 5 days)"""
471+ """Check if certificate is short-lived according to CA/Browser Forum definition:
472+ - For certificates issued on or after 15 March 2024 and prior to 15 March 2026:
473+ validity period <= 10 days (864,000 seconds)
474+ - For certificates issued on or after 15 March 2026:
475+ validity period <= 7 days (604,800 seconds)
476+ """
347477 try :
348478 # Use timezone.utc versions to avoid deprecation warnings
479+ issue_date = cert .not_valid_before_utc
349480 validity_period = cert .not_valid_after_utc - cert .not_valid_before_utc
350481 except AttributeError :
351482 # Fallback for older versions
483+ issue_date = cert .not_valid_before
352484 validity_period = cert .not_valid_after - cert .not_valid_before
353- return validity_period .days <= 5
485+
486+ # Convert issue_date to UTC if it's not timezone-aware
487+ if issue_date .tzinfo is None :
488+ issue_date = issue_date .replace (tzinfo = timezone .utc )
489+
490+ march_15_2026 = datetime (2026 , 3 , 15 , tzinfo = timezone .utc )
491+ if issue_date >= march_15_2026 :
492+ return validity_period .total_seconds () <= 604800 # 7 days in seconds
493+ return validity_period .total_seconds () <= 864000 # 10 days in seconds
354494
355495 @staticmethod
356496 def _extract_crl_distribution_points (cert : x509 .Certificate ) -> list [str ]:
@@ -446,7 +586,7 @@ def _check_certificate_against_crl_url(
446586 ca_cert .subject ,
447587 crl_url ,
448588 )
449- # In most cases this indicates a configuration issue, but we'll still try verification
589+ return CRLValidationResult . ERROR
450590
451591 if not self ._verify_crl_signature (crl , ca_cert ):
452592 logger .warning ("CRL signature verification failed for URL: %s" , crl_url )
@@ -545,25 +685,16 @@ def _extract_certificate_chains_from_connection(
545685 Returns:
546686 List of certificate chains, where each chain is a list of x509.Certificate objects
547687 """
548- from OpenSSL .crypto import FILETYPE_ASN1 , dump_certificate
549-
550688 try :
551- cert_chain = connection .get_peer_cert_chain ()
689+ # Convert OpenSSL certificates to cryptography x509 certificates
690+ cert_chain = connection .get_peer_cert_chain (as_cryptography = True )
552691 if not cert_chain :
553692 logger .debug ("No certificate chain found in connection" )
554693 return []
555-
556- # Convert OpenSSL certificates to cryptography x509 certificates
557- x509_chain = []
558- for cert_openssl in cert_chain :
559- cert_der = dump_certificate (FILETYPE_ASN1 , cert_openssl )
560- cert_x509 = x509 .load_der_x509_certificate (cert_der , default_backend ())
561- x509_chain .append (cert_x509 )
562-
563694 logger .debug (
564- "Extracted %d certificates for CRL validation" , len (x509_chain )
695+ "Extracted %d certificates for CRL validation" , len (cert_chain )
565696 )
566- return [x509_chain ] # Return as a single chain
697+ return [cert_chain ] # Return as a single chain
567698
568699 except Exception as e :
569700 logger .warning (
0 commit comments