diff --git a/passageidentity/auth.py b/passageidentity/auth.py new file mode 100644 index 0000000..0c0a2c5 --- /dev/null +++ b/passageidentity/auth.py @@ -0,0 +1,84 @@ +"""Provides the Auth class for interacting with the Passage API.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import jwt +import jwt.algorithms + +from passageidentity.errors import PassageError +from passageidentity.helper import fetch_app +from passageidentity.openapi_client.api.magic_links_api import MagicLinksApi +from passageidentity.openapi_client.exceptions import ApiException +from passageidentity.openapi_client.models.create_magic_link_request import CreateMagicLinkRequest + +if TYPE_CHECKING: + from passageidentity.openapi_client.models.magic_link import MagicLink + +CreateMagicLinkArgs = CreateMagicLinkRequest + + +class Auth: + """Auth class for handling operations to authenticate and validate JWTs.""" + + def __init__(self, app_id: str, request_headers: dict[str, str]) -> None: + """Initialize the Auth class with the app ID and request headers.""" + self.app_id = app_id + self.request_headers = request_headers + self.jwks = jwt.PyJWKClient( + f"https://auth.passage.id/v1/apps/{self.app_id}/.well-known/jwks.json", + # must set a user agent to avoid 403 from CF + headers={"User-Agent": "passageidentity/python"}, + ) + self.app = fetch_app(self.app_id) + + self.magic_links_api = MagicLinksApi() + + def validate_jwt(self, token: str) -> str: + """Verify the JWT and return the user ID for the authenticated user, or throw a PassageError.""" + try: + kid = jwt.get_unverified_header(token)["kid"] + public_key = self.jwks.get_signing_key(kid) + claims = jwt.decode( + token, + public_key, + audience=[self.app_id] if self.app["hosted"] else self.app["auth_origin"], + algorithms=["RS256"], + ) + + return claims["sub"] + except Exception as e: + msg = f"JWT is not valid: {e}" + raise PassageError(msg) from e + + def create_magic_link(self, args: CreateMagicLinkArgs) -> MagicLink: + """Create a Magic Link for your app.""" + magic_link_req = {} + args_dict = args.to_dict() if isinstance(args, CreateMagicLinkRequest) else args + + magic_link_req["user_id"] = args_dict.get("user_id") or "" + magic_link_req["email"] = args_dict.get("email") or "" + magic_link_req["phone"] = args_dict.get("phone") or "" + + magic_link_req["language"] = args_dict.get("language") or "" + magic_link_req["magic_link_path"] = args_dict.get("magic_link_path") or "" + magic_link_req["redirect_url"] = args_dict.get("redirect_url") or "" + magic_link_req["send"] = args_dict.get("send") or False + magic_link_req["ttl"] = args_dict.get("ttl") or 0 + magic_link_req["type"] = args_dict.get("type") or "login" + + if args_dict.get("email"): + magic_link_req["channel"] = args_dict.get("channel") or "email" + elif args_dict.get("phone"): + magic_link_req["channel"] = args_dict.get("channel") or "phone" + + try: + return self.magic_links_api.create_magic_link( + self.app_id, + magic_link_req, # type: ignore[arg-type] + _headers=self.request_headers, + ).magic_link + except ApiException as e: + msg = "Could not create a magic link for this app" + raise PassageError.from_response_error(e, msg) from e diff --git a/passageidentity/errors.py b/passageidentity/errors.py index 0bdce8b..039e5d9 100644 --- a/passageidentity/errors.py +++ b/passageidentity/errors.py @@ -2,22 +2,45 @@ from __future__ import annotations +from typing import TYPE_CHECKING + +import typing_extensions + +if TYPE_CHECKING: + from passageidentity.openapi_client.exceptions import ApiException + class PassageError(Exception): """Error class for handling Passage errors.""" + @typing_extensions.deprecated( + "This should only be constructed by the Passage SDK. Use this type just for type checking.", + ) def __init__( self, message: str, status_code: int | None = None, status_text: str | None = None, body: dict | None = None, + error_code: str | None = None, ) -> None: """Initialize the error with a message, status code, status text, and optional body.""" self.message = message self.status_code = status_code self.status_text = status_text + + self.error_code = error_code + self.error = None + if body is not None: self.error = body["error"] - else: - self.error = None + self.error_code = body["code"] + + @classmethod + def from_response_error(cls, response_error: ApiException, message: str | None = None) -> PassageError: + """Initialize the error with a response body and optional message.""" + error_code = response_error.body["code"] if response_error.body else None + error_msg = response_error.body["error"] if response_error.body else None + msg = ": ".join(filter(None, [message, error_msg])) + + return cls(message=msg, status_code=response_error.status, error_code=error_code) diff --git a/passageidentity/helper.py b/passageidentity/helper.py index 3f7d0c1..5941cfe 100644 --- a/passageidentity/helper.py +++ b/passageidentity/helper.py @@ -9,7 +9,6 @@ from passageidentity.errors import PassageError BEARER_PATTERN = r"Bearer ([^\s,]+)" -BASE_URL = "https://api.passage.id/v1/apps/" def extract_token(auth_header: str) -> str: @@ -50,9 +49,10 @@ def get_auth_token_from_request(request: Request, auth_strategy: int) -> str: def fetch_app(app_id: str) -> dict: """Fetch the public key for the given app id from Passage.""" # unauthenticated request to get the public key - r = requests.get(BASE_URL + app_id) + r = requests.get(f"https://api.passage.id/v1/apps/{app_id}") if r.status_code != HTTPStatus.OK: - raise PassageError("Could not fetch app information for app id " + app_id) + msg = f"Could not fetch app information for app id {app_id}" + raise PassageError(msg) return r.json()["app"] diff --git a/passageidentity/passage.py b/passageidentity/passage.py index 3dcd6e5..54d2584 100644 --- a/passageidentity/passage.py +++ b/passageidentity/passage.py @@ -2,24 +2,17 @@ from __future__ import annotations -import json -import warnings -from http import HTTPStatus from typing import TYPE_CHECKING -import jwt -import jwt.algorithms +import typing_extensions -from passageidentity import requests +from passageidentity.auth import Auth from passageidentity.errors import PassageError -from passageidentity.helper import fetch_app, get_auth_token_from_request +from passageidentity.helper import get_auth_token_from_request +from passageidentity.user import User from .openapi_client.api import ( AppsApi, - MagicLinksApi, - TokensApi, - UserDevicesApi, - UsersApi, ) if TYPE_CHECKING: @@ -35,9 +28,6 @@ WebAuthnDevices, ) -AUTH_CACHE = {} -BASE_URL = "https://api.passage.id/v1/apps/" - class Passage: """Passage class for interacting with the Passage API.""" @@ -46,93 +36,36 @@ class Passage: HEADER_AUTH = 2 def __init__(self, app_id: str, api_key: str = "", auth_strategy: int = COOKIE_AUTH) -> None: - """When a Passage object is created, fetch the public key from the cache or make an API request to get it.""" + """Initialize a new Passage instance.""" + if not app_id: + msg = "A Passage app ID is required. Please include {app_id=YOUR_APP_ID, api_key=YOUR_API_KEY}." + raise PassageError(msg) + self.app_id: str = app_id self.passage_apikey: str = api_key self.auth_strategy: int = auth_strategy - self.request_headers = {"Authorization": "Bearer " + self.passage_apikey} - - if not app_id: - msg = "Passage App ID must be provided" - raise PassageError(msg) + self.request_headers = {"Authorization": f"Bearer {api_key}"} - # if the pubkey exists in the cache, use that to avoid making requests - if app_id in AUTH_CACHE: - self.jwks: dict[str, list] = AUTH_CACHE[app_id]["jwks"] - self.auth_origin: str = AUTH_CACHE[app_id]["auth_origin"] - else: - self.__refresh_auth_cache() - - def __fetch_jwks(self) -> dict[str, list]: - """Fetch JWKs for the app.""" - r = requests.get( - f"https://auth.passage.id/v1/apps/{self.app_id}/.well-known/jwks.json", - ) - - if r.status_code != HTTPStatus.OK: - raise PassageError( - "Could not fetch JWKs for app id " + self.app_id, - r.status_code, - r.reason, - r.json(), - ) - - jwks = r.json()["keys"] - - # translate the JWKS into map for O(1) access - jwk_items = {} - for jwk in jwks: - jwk_items[jwk["kid"]] = jwk - - return jwk_items - - def __fetch_hosted(self) -> bool: - """Fetch whether the app is hosted.""" - r = requests.get( - f"https://api.passage.id/v1/apps/{self.app_id}", - api_key=self.passage_apikey, - ) - - if r.status_code != HTTPStatus.OK: - raise PassageError( - "Could not fetch app info for app id " + self.app_id, - r.status_code, - r.reason, - r.json(), - ) - - return r.json()["app"]["hosted"] + self.auth = Auth(app_id, self.request_headers) + self.user = User(app_id, self.request_headers) + @typing_extensions.deprecated("Passage.validateJwt() will be deprecated. Use Passage.auth.validate_jwt() instead.") def validateJwt(self, token: str): # noqa: ANN201, N802 """Verify the JWT and return the user ID for the authenticated user, or throw a PassageError. Takes the place of the deprecated authenticateRequest() function. """ - return self.authenticateJWT(token) - - def __refresh_auth_cache(self) -> None: - self.auth_origin = fetch_app(self.app_id)["auth_origin"] - self.jwks = self.__fetch_jwks() - hosted = self.__fetch_hosted() - - AUTH_CACHE[self.app_id] = { - "jwks": self.jwks, - "auth_origin": self.auth_origin, - "hosted": hosted, - } + return self.auth.validate_jwt(token) + @typing_extensions.deprecated( + "Passage.authenticateRequest() will be deprecated. Use Passage.auth.validate_jwt() instead.", + ) def authenticateRequest(self, request: Request) -> str | PassageError: # noqa: N802 """Authenticate a Flask or Django request that uses Passage for authentication. This function will verify the JWT and return the user ID for the authenticated user, or throw a PassageError. """ - warnings.warn( - "Passage.authenticateRequest() is deprecated. Use Passage.authenticateJWT() instead.", - DeprecationWarning, - stacklevel=2, - ) - # check for authorization header token = get_auth_token_from_request(request, self.auth_strategy) if not token: @@ -141,11 +74,14 @@ def authenticateRequest(self, request: Request) -> str | PassageError: # noqa: # load and parse the JWT try: - return self.authenticateJWT(token) + return self.auth.validate_jwt(token) except Exception as e: msg = f"JWT is not valid: {e}" raise PassageError(msg) from e + @typing_extensions.deprecated( + "Passage.authenticateJWT() will be deprecated. Use Passage.auth.validate_jwt() instead.", + ) def authenticateJWT(self, token: str) -> str | PassageError: # noqa: N802 """Authenticate a JWT from Passage. @@ -153,33 +89,11 @@ def authenticateJWT(self, token: str) -> str | PassageError: # noqa: N802 This function can be used to authenticate JWTs from Passage if they are not sent in a typical cookie or authorization header. """ - # load and parse the JWT - - try: - hosted = AUTH_CACHE[self.app_id]["hosted"] - kid = jwt.get_unverified_header(token)["kid"] - jwk = AUTH_CACHE[self.app_id]["jwks"][kid] - - # if the JWK can't be found, they might need to udpate the JWKS for this Passage intance - # re-fetch the JWKS and try again - if not jwk: - _ = self.__refresh_auth_cache - hosted = AUTH_CACHE[self.app_id]["hosted"] - kid = jwt.get_unverified_header(token)["kid"] - jwk = AUTH_CACHE[self.app_id]["jwks"][kid] - - public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk)) - claims = jwt.decode( - token, - public_key, # type: ignore[arg-type] - audience=[self.app_id] if hosted else self.auth_origin, - algorithms=["RS256"], - ) - return claims["sub"] - except Exception as e: - msg = f"JWT is not valid: {e}" - raise PassageError(msg) from e + return self.auth.validate_jwt(token) + @typing_extensions.deprecated( + "Passage.createMagicLink() will be deprecated. Use Passage.auth.create_magic_link() instead.", + ) def createMagicLink( # noqa: N802 self, magicLinkAttributes: CreateMagicLinkRequest, # noqa: N803 @@ -190,240 +104,105 @@ def createMagicLink( # noqa: N802 msg = "No Passage API key provided." raise PassageError(msg) - magic_link_req = {} - - magic_link_req["user_id"] = magicLinkAttributes.get("user_id") or "" # type: ignore # noqa: PGH003 - magic_link_req["email"] = magicLinkAttributes.get("email") or "" # type: ignore # noqa: PGH003 - magic_link_req["phone"] = magicLinkAttributes.get("phone") or "" # type: ignore # noqa: PGH003 - - magic_link_req["language"] = magicLinkAttributes.get("language") or "" # type: ignore # noqa: PGH003 - magic_link_req["magic_link_path"] = ( - magicLinkAttributes.get("magic_link_path") or "" # type: ignore # noqa: PGH003 - ) - magic_link_req["redirect_url"] = magicLinkAttributes.get("redirect_url") or "" # type: ignore # noqa: PGH003 - magic_link_req["send"] = magicLinkAttributes.get("send") or False # type: ignore # noqa: PGH003 - magic_link_req["ttl"] = magicLinkAttributes.get("ttl") or 0 # type: ignore # noqa: PGH003 - magic_link_req["type"] = magicLinkAttributes.get("type") or "login" # type: ignore # noqa: PGH003 - - if magicLinkAttributes.get("email"): # type: ignore # noqa: PGH003 - magic_link_req["channel"] = magicLinkAttributes.get("channel") or "email" # type: ignore # noqa: PGH003 - elif magicLinkAttributes.get("phone"): # type: ignore # noqa: PGH003 - magic_link_req["channel"] = magicLinkAttributes.get("channel") or "phone" # type: ignore # noqa: PGH003 - - try: - client = MagicLinksApi() - return client.create_magic_link( - self.app_id, - magic_link_req, # type: ignore[arg-type] - _headers=self.request_headers, - ).magic_link # type: ignore[attr-defined] - except Exception as e: - msg = f"Failed to create magic link: {e}" - raise PassageError(msg) from e + return self.auth.create_magic_link(magicLinkAttributes) # type: ignore[attr-defined] + @typing_extensions.deprecated("Passage.getApp() will be removed without replacement.") def getApp(self) -> AppInfo | PassageError: # noqa: N802 """Use Passage API to get info for their app.""" client = AppsApi() return client.get_app(self.app_id).app + @typing_extensions.deprecated("Passage.getUser() will be deprecated. Use Passage.user.get() instead.") def getUser(self, user_id: str) -> UserInfo | PassageError: # noqa: N802 """Use Passage API to get info for a user, look up by user ID.""" - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UsersApi() - return client.get_user( - self.app_id, - user_id, - _headers=self.request_headers, - ).user - except Exception as e: - msg = f"Failed to fetch user data: {e}" - raise PassageError(msg) from e + return self.user.get(user_id) + @typing_extensions.deprecated( + "Passage.getUserByIdentifier() will be deprecated. Use Passage.user.get_by_identifier() instead.", + ) def getUserByIdentifier(self, userIdentifier: str) -> UserInfo | PassageError: # noqa: N802, N803 """Use Passage API to get info for a user, look up by user identifier.""" - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UsersApi() - users = client.list_paginated_users( - self.app_id, - limit=1, - identifier=userIdentifier.lower(), - _headers=self.request_headers, - ).users - except Exception as e: - msg = f"Failed to fetch user data: {e}" - raise PassageError(msg) from e - - if len(users) == 0: - msg = "Failed to find user data" - raise PassageError(msg) - - return self.getUser(users[0].id) + return self.user.get_by_identifier(userIdentifier) + @typing_extensions.deprecated( + "Passage.listUserDevices() will be deprecated. Use Passage.user.list_devices() instead.", + ) def listUserDevices( # noqa: N802 self, user_id: str, ) -> list[WebAuthnDevices] | PassageError: """Use Passage API to list user devices, look up by user ID.""" - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UserDevicesApi() - return client.list_user_devices( - self.app_id, - user_id, - _headers=self.request_headers, - ).devices - except Exception as e: - msg = f"Failed to list user's devices: {e}" - raise PassageError(msg) from e + return self.user.list_devices(user_id) + @typing_extensions.deprecated( + "Passage.deleteUserDevice() will be deprecated. Use Passage.user.revoke_device() instead.", + ) def deleteUserDevice( # noqa: N802 self, user_id: str, device_id: str, ) -> bool | PassageError: """Use Passage API to revoke user devices, look up by user ID.""" - return self.revokeUserDevice(user_id, device_id) + self.user.revoke_device(user_id, device_id) + return True + @typing_extensions.deprecated( + "Passage.revokeUserDevice() will be deprecated. Use Passage.user.revoke_device() instead.", + ) def revokeUserDevice( # noqa: N802 self, user_id: str, device_id: str, ) -> bool | PassageError: """Use Passage API to revoke user devices, look up by user ID.""" - warnings.warn( - "Passage.revokeUserDevice() is deprecated. Use Passage.deleteUserDevice() instead.", - DeprecationWarning, - stacklevel=2, - ) - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UserDevicesApi() - client.delete_user_devices( - self.app_id, - user_id, - device_id, - _headers=self.request_headers, - ) - return True # noqa: TRY300 - except Exception as e: - msg = f"Failed to revoke user device: {e}" - raise PassageError(msg) from e + self.user.revoke_device(user_id, device_id) + return True + @typing_extensions.deprecated( + "Passage.revokeUserRefreshTokens() will be deprecated. Use Passage.user.revoke_refresh_tokens() instead.", + ) def revokeUserRefreshTokens(self, user_id: str) -> bool | PassageError: # noqa: N802 """Use Passage API to revoke all of a user's refresh tokens, look up by user ID.""" - return self.signOut(user_id) + self.user.revoke_refresh_tokens(user_id) + return True + @typing_extensions.deprecated( + "Passage.signOut() will be deprecated. Use Passage.user.revoke_refresh_tokens() instead.", + ) def signOut( # noqa: N802 self, user_id: str, ) -> bool | PassageError: """Use Passage API to revoke all of a user's refresh tokens, look up by user ID.""" - warnings.warn( - "Passage.signOut() is deprecated. Use Passage.revokeUserRefreshTokens() instead.", - DeprecationWarning, - stacklevel=2, - ) - - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = TokensApi() - client.revoke_user_refresh_tokens( - self.app_id, - user_id, - _headers=self.request_headers, - ) - return True # noqa: TRY300 - except Exception as e: - msg = f"Failed to revoke user's refresh tokens: {e}" - raise PassageError(msg) from e + self.user.revoke_refresh_tokens(user_id) + return True + @typing_extensions.deprecated("Passage.activateUser() will be deprecated. Use Passage.user.activate() instead.") def activateUser(self, user_id: str) -> UserInfo | PassageError: # noqa: N802 """Activate Passage User.""" - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UsersApi() - return client.activate_user( - self.app_id, - user_id, - _headers=self.request_headers, - ).user - except Exception as e: - msg = f"Failed activate user: {e}" - raise PassageError(msg) from e + return self.user.activate(user_id) + @typing_extensions.deprecated("Passage.deactivateUser() will be deprecated. Use Passage.user.deactivate() instead.") def deactivateUser(self, user_id: str) -> UserInfo | PassageError: # noqa: N802 """Deactivate Passage User.""" - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UsersApi() - return client.deactivate_user( - self.app_id, - user_id, - _headers=self.request_headers, - ).user - except Exception as e: - msg = f"Failed deactivate user: {e}" - raise PassageError(msg) from e + return self.user.deactivate(user_id) + @typing_extensions.deprecated("Passage.updateUser() will be deprecated. Use Passage.user.update() instead.") def updateUser( # noqa: N802 self, user_id: str, attributes: UpdateUserRequest, ) -> UserInfo | PassageError: """Update Passage User.""" - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UsersApi() - return client.update_user( - self.app_id, - user_id, - attributes, - _headers=self.request_headers, - ).user - except Exception as e: - msg = f"Failed to update user attributes: {e}" - raise PassageError(msg) from e + return self.user.update(user_id, attributes) + @typing_extensions.deprecated("Passage.deleteUser() will be deprecated. Use Passage.user.delete() instead.") def deleteUser(self, user_id: str) -> bool | PassageError: # noqa: N802 """Delete Passage User.""" - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UsersApi() - client.delete_user(self.app_id, user_id, _headers=self.request_headers) - return True # noqa: TRY300 - except Exception as e: - msg = f"Failed to delete user: {e}" - raise PassageError(msg) from e + self.user.delete(user_id) + return True + @typing_extensions.deprecated("Passage.createUser() will be deprecated. Use Passage.user.create() instead.") def createUser( # noqa: N802 self, userAttributes: CreateUserRequest, # noqa: N803 @@ -433,18 +212,4 @@ def createUser( # noqa: N802 msg = "either phone or email must be provided to create the user" raise PassageError(msg) - # if no api key, fail - if self.passage_apikey == "": - msg = "No Passage API key provided." - raise PassageError(msg) - - try: - client = UsersApi() - return client.create_user( - self.app_id, - userAttributes, - _headers=self.request_headers, - ).user - except Exception as e: - msg = f"Failed to create user: {e}" - raise PassageError(msg) from e + return self.user.create(userAttributes) diff --git a/passageidentity/user.py b/passageidentity/user.py new file mode 100644 index 0000000..197148d --- /dev/null +++ b/passageidentity/user.py @@ -0,0 +1,131 @@ +"""Provides the User class for interacting with the Passage API.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from passageidentity.errors import PassageError +from passageidentity.openapi_client.exceptions import ApiException + +from .openapi_client.api import ( + TokensApi, + UserDevicesApi, + UsersApi, +) +from .openapi_client.models import ( + CreateUserRequest, + UpdateUserRequest, + UserInfo, +) + +if TYPE_CHECKING: + from .openapi_client.models import ( + WebAuthnDevices, + ) + +PassageUser = UserInfo +CreateUserArgs = CreateUserRequest +UpdateUserArgs = UpdateUserRequest + + +class User: + """User class for handling operations to get and update user information.""" + + def __init__(self, app_id: str, request_headers: dict[str, str]) -> None: + """Initialize the User class with the app ID and request headers.""" + self.app_id = app_id + self.request_headers = request_headers + self.users_api = UsersApi() + self.user_devices_api = UserDevicesApi() + self.tokens_api = TokensApi() + + def get(self, user_id: str) -> PassageUser: + """Get a user's object using their user ID.""" + try: + return self.users_api.get_user(self.app_id, user_id, _headers=self.request_headers).user + except ApiException as e: + msg = "Could not fetch user" + raise PassageError.from_response_error(e, msg) from e + + def get_by_identifier(self, identifier: str) -> PassageUser: + """Get a user's object using their user identifier.""" + try: + users = self.users_api.list_paginated_users( + self.app_id, + limit=1, + identifier=identifier.lower(), + _headers=self.request_headers, + ).users + except ApiException as e: + msg = "Could not fetch user by identifier" + raise PassageError.from_response_error(e, msg) from e + + if len(users) == 0: + msg = "Could not find user with identifier: {identifier}" + raise PassageError(msg) + + return self.get(users[0].id) + + def activate(self, user_id: str) -> PassageUser: + """Activate a user using their user ID.""" + try: + return self.users_api.activate_user(self.app_id, user_id, _headers=self.request_headers).user + except ApiException as e: + msg = "Could not activate user" + raise PassageError.from_response_error(e, msg) from e + + def deactivate(self, user_id: str) -> PassageUser: + """Deactivate a user using their user ID.""" + try: + return self.users_api.deactivate_user(self.app_id, user_id, _headers=self.request_headers).user + except ApiException as e: + msg = "Could not deactivate user" + raise PassageError.from_response_error(e, msg) from e + + def update(self, user_id: str, args: UpdateUserArgs) -> PassageUser: + """Update a user.""" + try: + return self.users_api.update_user(self.app_id, user_id, args, _headers=self.request_headers).user + except ApiException as e: + msg = "Could not update user" + raise PassageError.from_response_error(e, msg) from e + + def create(self, args: CreateUserArgs) -> PassageUser: + """Create a user.""" + try: + return self.users_api.create_user(self.app_id, args, _headers=self.request_headers).user + except ApiException as e: + msg = "Could not create user" + raise PassageError.from_response_error(e, msg) from e + + def delete(self, user_id: str) -> None: + """Delete a user using their user ID.""" + try: + self.users_api.delete_user(self.app_id, user_id, _headers=self.request_headers) + except ApiException as e: + msg = "Could not delete user" + raise PassageError.from_response_error(e, msg) from e + + def list_devices(self, user_id: str) -> list[WebAuthnDevices]: + """Get a user's devices using their user ID.""" + try: + return self.user_devices_api.list_user_devices(self.app_id, user_id, _headers=self.request_headers).devices + except ApiException as e: + msg = "Could not fetch user's devices" + raise PassageError.from_response_error(e, msg) from e + + def revoke_device(self, user_id: str, device_id: str) -> None: + """Revoke a user's device using their user ID and the device ID.""" + try: + self.user_devices_api.delete_user_devices(self.app_id, user_id, device_id, _headers=self.request_headers) + except ApiException as e: + msg = "Could not revoke user's device" + raise PassageError.from_response_error(e, msg) from e + + def revoke_refresh_tokens(self, user_id: str) -> None: + """Revokes all of a user's Refresh Tokens using their User ID.""" + try: + self.tokens_api.revoke_user_refresh_tokens(self.app_id, user_id, _headers=self.request_headers) + except ApiException as e: + msg = "Could not revoke user's refresh tokens" + raise PassageError.from_response_error(e, msg) from e diff --git a/tests/authenticate_test.py b/tests/authenticate_test.py index 18dd795..cb4279a 100644 --- a/tests/authenticate_test.py +++ b/tests/authenticate_test.py @@ -8,12 +8,11 @@ from passageidentity import PassageError from passageidentity.openapi_client.models.app_info import AppInfo from passageidentity.openapi_client.models.magic_link import MagicLink -from passageidentity.openapi_client.models.update_user_request import UpdateUserRequest -from passageidentity.openapi_client.models.user_info import UserInfo from passageidentity.passage import Passage load_dotenv() f = Faker() + PASSAGE_USER_ID = os.environ.get("PASSAGE_USER_ID") or "" PASSAGE_APP_ID = os.environ.get("PASSAGE_APP_ID") or "" PASSAGE_API_KEY = os.environ.get("PASSAGE_API_KEY") or "" @@ -38,11 +37,6 @@ def test_validate_jwt() -> None: assert user == PASSAGE_USER_ID -def test_fetch_jwks() -> None: - psg = Passage(PASSAGE_APP_ID, auth_strategy=Passage.HEADER_AUTH) - assert len(psg.jwks) > 0 - - def test_get_app() -> None: psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) app = cast(AppInfo, psg.getApp()) @@ -62,148 +56,6 @@ def test_create_magic_link() -> None: assert magic_link.ttl == 12 # type: ignore[attr-defined] -def test_get_user_info_valid() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - user = cast(UserInfo, psg.getUser(PASSAGE_USER_ID)) - assert user.id == PASSAGE_USER_ID - - -def test_get_user_info_by_identifier_valid() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - email = f.email() - new_user = cast(UserInfo, psg.createUser({"email": email})) # type: ignore[arg-type] - assert new_user.email == email - - user_by_identifier = cast(UserInfo, psg.getUserByIdentifier(email)) - assert user_by_identifier.id == new_user.id - - user = cast(UserInfo, psg.getUser(new_user.id)) - assert user.id == new_user.id - - assert user_by_identifier == user - assert psg.deleteUser(new_user.id) - - -def test_get_user_info_by_identifier_valid_upper_case() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - email = f.email() - new_user = cast(UserInfo, psg.createUser({"email": email})) # type: ignore[arg-type] - assert new_user.email == email - - user_by_identifier = cast(UserInfo, psg.getUserByIdentifier(email.upper())) - assert user_by_identifier.id == new_user.id - - user = cast(UserInfo, psg.getUser(new_user.id)) - assert user.id == new_user.id - - assert user_by_identifier == user - assert psg.deleteUser(new_user.id) - - -def test_get_user_info_by_identifier_phone_valid() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - phone = "+15005550030" - new_user = cast(UserInfo, psg.createUser({"phone": phone})) # type: ignore[arg-type] - assert new_user.phone == phone - - user_by_identifier = cast(UserInfo, psg.getUserByIdentifier(phone)) - assert user_by_identifier.id == new_user.id - - user = cast(UserInfo, psg.getUser(new_user.id)) - assert user.id == new_user.id - - assert user_by_identifier == user - assert psg.deleteUser(new_user.id) - - -def test_get_user_info_by_identifier_error() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - with pytest.raises(PassageError, match="Failed to find user data"): - psg.getUserByIdentifier("error@passage.id") - - -def test_activate_user() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - user = cast(UserInfo, psg.activateUser(PASSAGE_USER_ID)) - assert user.status == "active" - - -def test_deactivate_user() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - user = cast(UserInfo, psg.getUser(PASSAGE_USER_ID)) - user = cast(UserInfo, psg.deactivateUser(user.id)) - assert user.status == "inactive" - - -def test_list_user_devices() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - devices = cast(list, psg.listUserDevices(PASSAGE_USER_ID)) - assert len(devices) == 2 - - -def test_update_user_phone() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - phone = "+15005550021" - new_user = cast(UserInfo, psg.createUser({"phone": phone})) # type: ignore[arg-type] - - phone = "+15005550022" - user = cast(UserInfo, psg.updateUser(new_user.id, {"phone": phone})) # type: ignore[arg-type] - assert user.phone == phone - assert psg.deleteUser(new_user.id) - - -def test_update_user_email() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - email = f.email() - req = UpdateUserRequest(email=email) - user = cast(UserInfo, psg.updateUser(PASSAGE_USER_ID, req)) - assert user.email == email - - -def test_update_user_with_metadata() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - email = f.email() - user = cast(UserInfo, psg.updateUser(PASSAGE_USER_ID, {"email": email, "user_metadata": {"example1": "qwe"}})) # type: ignore[arg-type] - assert user.email == email - assert user.user_metadata["example1"] == "qwe" # type: ignore[index] - - user = cast(UserInfo, psg.updateUser(PASSAGE_USER_ID, {"email": email, "user_metadata": {"example1": "asd"}})) # type: ignore[arg-type] - assert user.email == email - assert user.user_metadata["example1"] == "asd" # type: ignore[index] - - -def test_create_user_with_metadata() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - email = f.email() - user = cast(UserInfo, psg.createUser({"email": email, "user_metadata": {"example1": "qwe"}})) # type: ignore[arg-type] - assert user.email == email - assert user.user_metadata["example1"] == "qwe" # type: ignore[index] - assert psg.deleteUser(user.id) - - -def test_get_user_info_user_does_not_exist() -> None: - pass - - -def test_create_and_delete_user() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - email = f.email() - new_user = cast(UserInfo, psg.createUser({"email": email})) # type: ignore[arg-type] - assert new_user.email == email - assert psg.deleteUser(new_user.id) - - def test_smart_link_valid() -> None: psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) @@ -211,15 +63,3 @@ def test_smart_link_valid() -> None: magic_link = cast(MagicLink, psg.createMagicLink({"email": email})) # type: ignore[arg-type] assert magic_link.identifier == email assert not magic_link.activated - - -def test_sign_out() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - assert psg.signOut(PASSAGE_USER_ID) - - -def test_revoke_user_refresh_tokens() -> None: - psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) - - assert psg.revokeUserRefreshTokens(PASSAGE_USER_ID) diff --git a/tests/errors_test.py b/tests/errors_test.py index 745a338..cd6fa24 100644 --- a/tests/errors_test.py +++ b/tests/errors_test.py @@ -1,12 +1,20 @@ from passageidentity import PassageError +from passageidentity.openapi_client.exceptions import ApiException + + +class MockApiException(ApiException): + def __init__(self, status: int, body: dict) -> None: + self.status = status + self.body = body def test_error_with_all_values() -> None: - error = PassageError("some message", 400, "Bad Request", {"error": "some error"}) + error = PassageError("some message", 400, "Bad Request", {"error": "some error", "code": "some_error_code"}) assert error.message == "some message" assert error.status_code == 400 assert error.status_text == "Bad Request" assert error.error == "some error" + assert error.error_code == "some_error_code" def test_error_with_only_message() -> None: @@ -15,3 +23,32 @@ def test_error_with_only_message() -> None: assert error.status_code is None assert error.status_text is None assert error.error is None + assert error.error_code is None + + +def test_from_response_error() -> None: + response_error = MockApiException( + status=400, + body={"error": "some error", "code": "some_error_code"}, + ) + + error = PassageError.from_response_error(response_error, "some message") + assert error.message == "some message: some error" + assert error.status_code == 400 + assert error.error_code == "some_error_code" + assert error.status_text is None + assert error.error is None + + +def test_from_response_error_without_message() -> None: + response_error = MockApiException( + status=400, + body={"error": "some error", "code": "some_error_code"}, + ) + + error = PassageError.from_response_error(response_error) + assert error.message == "some error" + assert error.status_code == 400 + assert error.error_code == "some_error_code" + assert error.status_text is None + assert error.error is None diff --git a/tests/user_test.py b/tests/user_test.py new file mode 100644 index 0000000..5bcdbce --- /dev/null +++ b/tests/user_test.py @@ -0,0 +1,169 @@ +import os +from typing import cast + +import pytest +from dotenv import load_dotenv +from faker import Faker + +from passageidentity.errors import PassageError +from passageidentity.openapi_client.models.update_user_request import UpdateUserRequest +from passageidentity.openapi_client.models.user_info import UserInfo +from passageidentity.passage import Passage + +load_dotenv() +f = Faker() + +PASSAGE_USER_ID = os.environ.get("PASSAGE_USER_ID") or "" +PASSAGE_APP_ID = os.environ.get("PASSAGE_APP_ID") or "" +PASSAGE_API_KEY = os.environ.get("PASSAGE_API_KEY") or "" +PASSAGE_AUTH_TOKEN = os.environ.get("PASSAGE_AUTH_TOKEN") or "" + + +def test_get_by_identifier_valid_upper_case() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + email = f.email() + new_user = cast(UserInfo, psg.user.create({"email": email})) # type: ignore[arg-type] + assert new_user.email == email + + user_by_identifier = cast(UserInfo, psg.user.get_by_identifier(email.upper())) + assert user_by_identifier.id == new_user.id + + user = cast(UserInfo, psg.user.get(new_user.id)) + assert user.id == new_user.id + + assert user_by_identifier == user + assert psg.user.delete(new_user.id) is None + + +def test_get_by_identifier_user_not_exist() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + with pytest.raises(PassageError, match="Could not find user with identifier"): + psg.user.get_by_identifier("error@passage.id") + + +def test_get_user_info_valid() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + user = cast(UserInfo, psg.getUser(PASSAGE_USER_ID)) + assert user.id == PASSAGE_USER_ID + + +def test_get_user_info_by_identifier_valid() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + email = f.email() + new_user = cast(UserInfo, psg.createUser({"email": email})) # type: ignore[arg-type] + assert new_user.email == email + + user_by_identifier = cast(UserInfo, psg.getUserByIdentifier(email)) + assert user_by_identifier.id == new_user.id + + user = cast(UserInfo, psg.getUser(new_user.id)) + assert user.id == new_user.id + + assert user_by_identifier == user + assert psg.deleteUser(new_user.id) + + +def test_get_user_info_by_identifier_phone_valid() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + phone = "+15005550030" + new_user = cast(UserInfo, psg.createUser({"phone": phone})) # type: ignore[arg-type] + assert new_user.phone == phone + + user_by_identifier = cast(UserInfo, psg.getUserByIdentifier(phone)) + assert user_by_identifier.id == new_user.id + + user = cast(UserInfo, psg.getUser(new_user.id)) + assert user.id == new_user.id + + assert user_by_identifier == user + assert psg.deleteUser(new_user.id) + + +def test_activate_user() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + user = cast(UserInfo, psg.activateUser(PASSAGE_USER_ID)) + assert user.status == "active" + + +def test_deactivate_user() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + user = cast(UserInfo, psg.getUser(PASSAGE_USER_ID)) + user = cast(UserInfo, psg.deactivateUser(user.id)) + assert user.status == "inactive" + + +def test_list_user_devices() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + devices = cast(list, psg.listUserDevices(PASSAGE_USER_ID)) + assert len(devices) == 2 + + +def test_update_user_phone() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + phone = "+15005550021" + new_user = cast(UserInfo, psg.createUser({"phone": phone})) # type: ignore[arg-type] + + phone = "+15005550022" + user = cast(UserInfo, psg.updateUser(new_user.id, {"phone": phone})) # type: ignore[arg-type] + assert user.phone == phone + assert psg.deleteUser(new_user.id) + + +def test_update_user_email() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + email = f.email() + req = UpdateUserRequest(email=email) + user = cast(UserInfo, psg.updateUser(PASSAGE_USER_ID, req)) + assert user.email == email + + +def test_update_user_with_metadata() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + email = f.email() + user = cast(UserInfo, psg.updateUser(PASSAGE_USER_ID, {"email": email, "user_metadata": {"example1": "qwe"}})) # type: ignore[arg-type] + assert user.email == email + assert user.user_metadata["example1"] == "qwe" # type: ignore[index] + + user = cast(UserInfo, psg.updateUser(PASSAGE_USER_ID, {"email": email, "user_metadata": {"example1": "asd"}})) # type: ignore[arg-type] + assert user.email == email + assert user.user_metadata["example1"] == "asd" # type: ignore[index] + + +def test_create_user_with_metadata() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + email = f.email() + user = cast(UserInfo, psg.createUser({"email": email, "user_metadata": {"example1": "qwe"}})) # type: ignore[arg-type] + assert user.email == email + assert user.user_metadata["example1"] == "qwe" # type: ignore[index] + assert psg.deleteUser(user.id) + + +def test_create_and_delete_user() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + email = f.email() + new_user = cast(UserInfo, psg.createUser({"email": email})) # type: ignore[arg-type] + assert new_user.email == email + assert psg.deleteUser(new_user.id) + + +def test_sign_out() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + assert psg.signOut(PASSAGE_USER_ID) + + +def test_revoke_user_refresh_tokens() -> None: + psg = Passage(PASSAGE_APP_ID, PASSAGE_API_KEY) + + assert psg.revokeUserRefreshTokens(PASSAGE_USER_ID)