diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4f2523f..e221f0c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -104,21 +104,21 @@ repos: exclude: ^tests - repo: https://github.com/PyCQA/pylint - rev: v4.0.3 + rev: v4.0.4 hooks: - id: pylint args: - --exit-zero - repo: https://github.com/asottile/pyupgrade - rev: v3.21.1 + rev: v3.21.2 hooks: - id: pyupgrade args: [--py310-plus, --keep-runtime-typing] - repo: https://github.com/charliermarsh/ruff-pre-commit # Ruff version. - rev: v0.14.5 + rev: v0.14.8 hooks: # Run the linter. - id: ruff-check @@ -139,12 +139,13 @@ repos: - id: refurb - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.18.2 + rev: v1.19.0 hooks: - id: mypy args: [--config-file=./pyproject.toml] additional_dependencies: - types-requests + - pytest-order exclude: ^mailgun/examples/ - repo: https://github.com/RobertCraigie/pyright-python @@ -153,7 +154,7 @@ repos: - id: pyright - repo: https://github.com/PyCQA/bandit - rev: 1.8.6 + rev: 1.9.2 hooks: - id: bandit args: ["-c", "pyproject.toml", "-r", "."] diff --git a/CHANGELOG.md b/CHANGELOG.md index 82e39ee..b8d2f42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,47 @@ We [keep a changelog.](http://keepachangelog.com/) ## [Unreleased] +### Added + +- Add missing endpoints: + + - Add "users", "me" to the `users` key of special cases in the class `Config`. + - Add `handle_users` to `mailgun.handlers.users_handler` for parsing [Users API](https://documentation.mailgun.com/docs/mailgun/api-reference/send/mailgun/users). + - Add `handle_mailboxes_credentials()` to `mailgun.handlers.domains_handler` for parsing `Update Mailgun SMTP credentials` in [Credentials API](https://documentation.mailgun.com/docs/mailgun/api-reference/send/mailgun/credentials). + +- Examples: + + - Move credentials examples from `mailgun/examples/domain_examples.py` to `mailgun/examples/credentials_examples.py` and add a new example `put_mailboxes_credentials()`. + - Add the `get_routes_match()` example to `mailgun/examples/routes_examples.py` + - Add the `update_template_version_copy()` example to `mailgun/examples/templates_examples.py` + - Add `mailgun/examples/users_examples.py` + +- Docs: + + - Add `Credentials` and `Users` sections with examples to `README.md`. + - Add docstrings to the test class `UsersTests` & `AsyncUsersTests` and theirs methods. + +- Tests: + + - Add `test_put_mailboxes_credentials` to `DomainTests` and `AsyncDomainTests` + - Add `test_get_routes_match` to `RoutesTests` and `AsyncRoutesTests` + - Add `test_update_template_version_copy` to `TemplatesTests ` and `AsyncTemplatesTests ` + - Add classes `UsersTests` and `AsyncUsersTests` to `tests/tests.py`. + +### Changed + +- Update `handle_templates()` in `mailgun/handlers/templates_handler.py` to handle `new_tag` + +- Update CI workflows: update `pre-commit` hooks to the latest versions. + +- Modify `mypy`'s additional_dependencies in `.pre-commit-config.yaml` to suppress `error: Untyped decorator makes function` by adding `pytest-order` + +- Replace spaces with tabs in `Makefile` + +### Pull Requests Merged + +- [PR_25](https://github.com/mailgun/mailgun-python/pull/25) - Add missing endpoints + ## [1.4.0] - 2025-11-20 ### Added diff --git a/Makefile b/Makefile index 6be8e51..9092db1 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ export PRINT_HELP_PYSCRIPT BROWSER := python -c "$$BROWSER_PYSCRIPT" -clean: clean-cov clean-build clean-pyc clean-test clean-temp clean-other ## remove all build, test, coverage and Python artifacts +clean: clean-cov clean-build clean-pyc clean-test clean-temp clean-other ## remove all build, test, coverage and Python artifacts clean-cov: rm -rf .coverage @@ -48,7 +48,7 @@ clean-cov: rm -rf pytest.xml rm -rf pytest-coverage.txt -clean-build: ## remove build artifacts +clean-build: ## remove build artifacts rm -fr build/ rm -fr dist/ rm -fr .eggs/ @@ -58,19 +58,19 @@ clean-build: ## remove build artifacts clean-env: ## remove conda environment conda remove -y -n $(CONDA_ENV_NAME) --all ; conda info -clean-pyc: ## remove Python file artifacts +clean-pyc: ## remove Python file artifacts find . -name '*.pyc' -exec rm -f {} + find . -name '*.pyo' -exec rm -f {} + find . -name '*~' -exec rm -f {} + find . -name '__pycache__' -exec rm -fr {} + -clean-test: ## remove test and coverage artifacts +clean-test: ## remove test and coverage artifacts rm -fr .tox/ rm -f .coverage rm -fr htmlcov/ rm -fr .pytest_cache -clean-temp: ## remove temp artifacts +clean-temp: ## remove temp artifacts rm -fr temp/tmp.txt rm -fr tmp.txt @@ -84,21 +84,21 @@ clean-other: help: $(PYTHON3) -c "$$PRINT_HELP_PYSCRIPT" < $(MAKEFILE_LIST) -environment: ## handles environment creation +environment: ## handles environment creation conda env create -f environment.yaml --name $(CONDA_ENV_NAME) --yes conda run --name $(CONDA_ENV_NAME) pip install . -environment-dev: ## Handles environment creation +environment-dev: ## Handles environment creation conda env create -n $(CONDA_ENV_NAME)-dev -y --file environment-dev.yml conda run --name $(CONDA_ENV_NAME)-dev pip install -e . install: clean ## install the package to the active Python's site-packages pip install . -release: dist ## package and upload a release +release: dist ## package and upload a release twine upload dist/* -dist: clean ## builds source and wheel package +dist: clean ## builds source and wheel package python -m build ls -l dist @@ -112,7 +112,7 @@ dev-full: clean ## install the package's development version to a fresh environ $(CONDA_ACTIVATE) $(CONDA_ENV_NAME)-dev && pre-commit install -pre-commit: ## runs pre-commit against files. NOTE: older files are disabled in the pre-commit config. +pre-commit: ## runs pre-commit against files. NOTE: older files are disabled in the pre-commit config. pre-commit run --all-files check-env: @@ -141,7 +141,7 @@ test-cov: check-env ## checks test coverage requirements tests-cov-fail: @pytest --cov=$(SRC_DIR) --cov-report term-missing --cov-report=html --cov-fail-under=80 -coverage: ## check code coverage quickly with the default Python +coverage: ## check code coverage quickly with the default Python coverage run --source $(SRC_DIR) -m pytest coverage report -m coverage html diff --git a/README.md b/README.md index 178692b..1b97d3b 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,12 @@ Check out all the resources and Python code examples in the official - [Create a single validation](#create-a-single-validation) - [Inbox placement](#inbox-placement) - [Get all inbox](#get-all-inbox) + - [Credentials](#credentials) + - [List Mailgun SMTP credential metadata for a given domain](#list-mailgun-smtp-credential-metadata-for-a-given-domain) + - [Create Mailgun SMTP credentials for a given domain](#create-mailgun-smtp-credentials-for-a-given-domain) + - [Users](#users) + - [Get users on an account](#get-users-on-an-account) + - [Get a user's details](#) - [License](#license) - [Contribute](#contribute) - [Contributors](#contributors) @@ -1307,6 +1313,70 @@ def get_all_inbox() -> None: print(req.json()) ``` +### Credentials + +#### List Mailgun SMTP credential metadata for a given domain + +```python +def get_credentials() -> None: + """ + GET /domains//credentials + :return: + """ + request = client.domains_credentials.get(domain=domain) + print(request.json()) +``` + +#### Create Mailgun SMTP credentials for a given domain + +```python +def post_credentials() -> None: + """ + POST /domains//credentials + :return: + """ + data = { + "login": f"alice_bob@{domain}", + "password": "test_new_creds123", # pragma: allowlist secret + } + request = client.domains_credentials.create(domain=domain, data=data) + print(request.json()) +``` + +### Users + +#### Get users on an account + +```python +def get_users() -> None: + """ + GET /v5/users + :return: + """ + query = {"role": "admin", "limit": "0", "skip": "0"} + req = client.users.get(filters=query) + print(req.json()) +``` + +#### Get a user's details + +```python +def get_user_details() -> None: + """ + GET /v5/users/{user_id} + :return: + """ + mailgun_email = os.environ["MAILGUN_EMAIL"] + query = {"role": "admin", "limit": "0", "skip": "0"} + req1 = client.users.get(filters=query) + users = req1.json()["users"] + + for user in users: + if mailgun_email == user["email"]: + req2 = client.users.get(user_id=user["id"]) + print(req2.json()) +``` + ## License [Apache-2.0](https://choosealicense.com/licenses/apache-2.0/) diff --git a/mailgun/client.py b/mailgun/client.py index a95eee5..75c76b8 100644 --- a/mailgun/client.py +++ b/mailgun/client.py @@ -30,6 +30,7 @@ from mailgun.handlers.default_handler import handle_default from mailgun.handlers.domains_handler import handle_domainlist from mailgun.handlers.domains_handler import handle_domains +from mailgun.handlers.domains_handler import handle_mailboxes_credentials from mailgun.handlers.domains_handler import handle_sending_queues from mailgun.handlers.email_validation_handler import handle_address_validate from mailgun.handlers.error_handler import ApiError @@ -46,6 +47,7 @@ from mailgun.handlers.suppressions_handler import handle_whitelists from mailgun.handlers.tags_handler import handle_tags from mailgun.handlers.templates_handler import handle_templates +from mailgun.handlers.users_handler import handle_users if TYPE_CHECKING: @@ -65,6 +67,7 @@ "dkim_selector": handle_domains, "web_prefix": handle_domains, "sending_queues": handle_sending_queues, + "mailboxes": handle_mailboxes_credentials, "ips": handle_ips, "ip_pools": handle_ippools, "tags": handle_tags, @@ -82,6 +85,7 @@ "events": handle_default, "analytics": handle_metrics, "bounce-classification": handle_bounce_classification, + "users": handle_users, } @@ -144,6 +148,10 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: "base": v2_base, "keys": ["bounce-classification", "metrics"], }, + "users": { + "base": v5_base, + "keys": ["users", "me"], + }, } if key in special_cases: @@ -165,6 +173,12 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: "keys": f"{part1}-{part2}".split("_"), }, headers + if "users" in key: + return { + "base": v5_base, + "keys": key.split("_"), + }, headers + # Handle DIPP endpoints if "subaccount" in key: if "ip_pools" in key: diff --git a/mailgun/examples/credentials_examples.py b/mailgun/examples/credentials_examples.py new file mode 100644 index 0000000..07b6f25 --- /dev/null +++ b/mailgun/examples/credentials_examples.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import os + +from mailgun.client import Client + + +key: str = os.environ["APIKEY"] +domain: str = os.environ["DOMAIN"] + +client: Client = Client(auth=("api", key)) + + +def get_credentials() -> None: + """ + GET /domains//credentials + :return: + """ + request = client.domains_credentials.get(domain=domain) + print(request.json()) + + +def post_credentials() -> None: + """ + POST /domains//credentials + :return: + """ + data = { + "login": f"alice_bob@{domain}", + "password": "test_new_creds123", # pragma: allowlist secret + } + request = client.domains_credentials.create(domain=domain, data=data) + print(request.json()) + + +def put_credentials() -> None: + """ + PUT /domains//credentials/ + :return: + """ + data = {"password": "test_new_creds12356"} # pragma: allowlist secret + request = client.domains_credentials.put(domain=domain, data=data, login=f"alice_bob@{domain}") + print(request.json()) + + +def put_mailboxes_credentials() -> None: + """ + PUT /v3/{domain_name}/mailboxes/{spec} + :return: + """ + + req = client.mailboxes.put(domain=domain, login=f"alice_bob@{domain}") + print(req.json()) + + +def delete_all_domain_credentials() -> None: + """ + DELETE /domains//credentials + :return: + """ + request = client.domains_credentials.delete(domain=domain) + print(request.json()) + + +def delete_credentials() -> None: + """ + DELETE /domains//credentials/ + :return: + """ + request = client.domains_credentials.delete(domain=domain, login=f"alice_bob@{domain}") + print(request.json()) + + +if __name__ == "__main__": + put_mailboxes_credentials() diff --git a/mailgun/examples/domain_examples.py b/mailgun/examples/domain_examples.py index 16f0d66..f703cbe 100644 --- a/mailgun/examples/domain_examples.py +++ b/mailgun/examples/domain_examples.py @@ -81,56 +81,6 @@ def delete_domain() -> None: print(request.status_code) -def get_credentials() -> None: - """ - GET /domains//credentials - :return: - """ - request = client.domains_credentials.get(domain=domain) - print(request.json()) - - -def post_credentials() -> None: - """ - POST /domains//credentials - :return: - """ - data = { - "login": f"alice_bob@{domain}", - "password": "test_new_creds123", # pragma: allowlist secret - } - request = client.domains_credentials.create(domain=domain, data=data) - print(request.json()) - - -def put_credentials() -> None: - """ - PUT /domains//credentials/ - :return: - """ - data = {"password": "test_new_creds12356"} # pragma: allowlist secret - request = client.domains_credentials.put(domain=domain, data=data, login=f"alice_bob@{domain}") - print(request.json()) - - -def delete_all_domain_credentials() -> None: - """ - DELETE /domains//credentials - :return: - """ - request = client.domains_credentials.delete(domain=domain) - print(request.json()) - - -def delete_credentials() -> None: - """ - DELETE /domains//credentials/ - :return: - """ - request = client.domains_credentials.delete(domain=domain, login=f"alice_bob@{domain}") - print(request.json()) - - def get_connections() -> None: """ GET /domains//connection @@ -238,4 +188,5 @@ def get_sending_queues() -> None: if __name__ == "__main__": + add_domain() get_domains() diff --git a/mailgun/examples/routes_examples.py b/mailgun/examples/routes_examples.py index 97ea77f..93cff8b 100644 --- a/mailgun/examples/routes_examples.py +++ b/mailgun/examples/routes_examples.py @@ -5,6 +5,7 @@ key: str = os.environ["APIKEY"] domain: str = os.environ["DOMAIN"] +sender: str = os.environ["MESSAGES_FROM"] client: Client = Client(auth=("api", key)) @@ -67,5 +68,15 @@ def delete_route() -> None: print(req.json()) +def get_routes_match() -> None: + """ + GET /routes/match + :return: + """ + query = {"address": sender} + req = client.routes_match.get(domain=domain, filters=query) + print(req.json()) + + if __name__ == "__main__": - delete_route() + get_routes_match() diff --git a/mailgun/examples/templates_examples.py b/mailgun/examples/templates_examples.py index 4297e96..f9da8c8 100644 --- a/mailgun/examples/templates_examples.py +++ b/mailgun/examples/templates_examples.py @@ -141,5 +141,27 @@ def get_all_versions() -> None: print(req.json()) +def update_template_version_copy() -> None: + """ + PUT /v3/{domain_name}/templates/{template_name}/versions/{version_name}/copy/{new_version_name} + :return: + """ + data = {"comment": "An updated version comment"} + + req = client.templates.put( + domain=domain, + filters=data, + template_name="template.name1", + versions=True, + tag="v2", + copy=True, + new_tag="v3", + ) + print(req.json()) + + if __name__ == "__main__": - get_all_versions() + # get_all_versions() + post_template() + create_new_template_version() + update_template_version_copy() diff --git a/mailgun/examples/users_examples.py b/mailgun/examples/users_examples.py new file mode 100644 index 0000000..172b122 --- /dev/null +++ b/mailgun/examples/users_examples.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import os + +from mailgun.client import Client + + +key: str = os.environ["APIKEY"] +domain: str = os.environ["DOMAIN"] +mailgun_email = os.environ["MAILGUN_EMAIL"] + +client: Client = Client(auth=("api", key)) + + +def get_users() -> None: + """ + GET /v5/users + :return: + """ + query = {"role": "admin", "limit": "0", "skip": "0"} + req = client.users.get(filters=query) + print(req.json()) + + +def get_own_user_details() -> None: + """ + GET /v5/users/me + + Please note, for the command("Get one's own user details") to be successful, you must use a Web type API key for the call. Private type API keys will Not work. + This below Call will generate a Web API key tied to the account user associated with the data inputted for the USER_EMAIL field and USER_ID values. This is returned by the API in the "secret":"API_KEY" key/value pair. This key will authenticate the call(Get one's own user details) made to the /v5/users/me endpoint, and will return the user's data associated with the USER_EMAIL and USER_ID values. + + Generate Web API Key: + curl -i -X POST \ + -u api:API_KEY \ + https://api.mailgun.net/v1/keys \ + -F email=USER_EMAIL \ + -F kind=web \ + -F expiration=SECONDS (Lifetime of the key in seconds) \ + -F role=ROLE \ + -F user_id=USER_ID \ + -F description=DESCRIPTION + + see https://documentation.mailgun.com/docs/mailgun/api-reference/send/mailgun/keys/api.(*keysapi).createkey-fm-7 + + Important Notes: + API_KEY - Private API Key + USER_EMAIL - The user login email address of the user that is trying to make the call to the /v5/users/me endpoint. + SECONDS - How many seconds you want the key to be active before it expires. + ROLE - The role of the API Key. This dictates what permissions the key has (https://help.mailgun.com/hc/en-us/articles/26016288026907-API-Key-Roles) + USER_ID - The internal User ID of the user that is trying to call the /v5/users/me endpoint. This is present in the URL in the address bar when viewing the User details in the GUI or in Admin. Both will show /users/USER_ID in the address. + DESCRIPTION - Description of the key. + + :return: + """ + secret: str = os.environ["SECRET"] + client_with_secret_key: Client = Client(auth=("api", secret)) + req = client_with_secret_key.users.get(user_id="me") + print(req.json()) + + +def get_user_details() -> None: + """ + GET /v5/users/{user_id} + :return: + """ + query = {"role": "admin", "limit": "0", "skip": "0"} + req1 = client.users.get(filters=query) + users = req1.json()["users"] + + for user in users: + if mailgun_email == user["email"]: + req2 = client.users.get(user_id=user["id"]) + print(req2.json()) + + +if __name__ == "__main__": + get_users() + get_own_user_details() + get_user_details() diff --git a/mailgun/handlers/domains_handler.py b/mailgun/handlers/domains_handler.py index 30a96ac..1aa148c 100644 --- a/mailgun/handlers/domains_handler.py +++ b/mailgun/handlers/domains_handler.py @@ -97,3 +97,27 @@ def handle_sending_queues( ) -> str | Any: """Handle sending queues endpoint URL construction.""" return url["base"][:-1] + f"/{domain}/sending_queues" + + +def handle_mailboxes_credentials( + url: dict[str, Any], + domain: str | None, + _method: str | None, + **kwargs: Any, +) -> Any: + """Handle Mailboxes credentials. + + :param url: Incoming URL dictionary + :type url: dict + :param domain: Incoming domain + :type domain: str + :param _method: Incoming request method (it's not being used for this handler) + :type _method: str + :param kwargs: kwargs + :return: final url for Mailboxes credentials endpoint + """ + final_keys = path.join("/", *url["keys"]) if url["keys"] else "" + if "login" in kwargs: + url = url["base"] + domain + final_keys + "/" + kwargs["login"] + + return url diff --git a/mailgun/handlers/templates_handler.py b/mailgun/handlers/templates_handler.py index 09fc8f7..4bd7437 100644 --- a/mailgun/handlers/templates_handler.py +++ b/mailgun/handlers/templates_handler.py @@ -30,33 +30,24 @@ def handle_templates( :raises: ApiError """ final_keys = path.join("/", *url["keys"]) if url["keys"] else "" - if "template_name" in kwargs: - if "versions" in kwargs: - if kwargs["versions"]: - if "tag" in kwargs: - url = ( - url["base"] - + domain - + final_keys - + "/" - + kwargs["template_name"] - + "/versions/" - + kwargs["tag"] - ) - else: - url = ( - url["base"] - + domain - + final_keys - + "/" - + kwargs["template_name"] - + "/versions" - ) - else: - raise ApiError("Versions should be True or absent") - else: - url = url["base"] + domain + final_keys + "/" + kwargs["template_name"] - else: - url = url["base"] + domain + final_keys - - return url + domain_url = f"{url['base']}{domain}{final_keys}" + + if "template_name" not in kwargs: + return domain_url + + template_url = domain_url + f"/{kwargs['template_name']}" + + if "versions" not in kwargs: + return template_url + + if not kwargs["versions"]: + raise ApiError("Versions should be True or absent") + + versions_url = template_url + "/versions" + + if "tag" in kwargs and "copy" not in kwargs: + return versions_url + f"/{kwargs['tag']}" + if "tag" in kwargs and "copy" in kwargs and "new_tag" in kwargs: + return versions_url + f"/{kwargs['tag']}/copy/{kwargs['new_tag']}" + + return versions_url diff --git a/mailgun/handlers/users_handler.py b/mailgun/handlers/users_handler.py new file mode 100644 index 0000000..04b7d10 --- /dev/null +++ b/mailgun/handlers/users_handler.py @@ -0,0 +1,37 @@ +"""USERS HANDLER. + +Doc: https://documentation.mailgun.com/docs/mailgun/api-reference/send/mailgun/users +""" + +from __future__ import annotations + +from os import path +from typing import Any + + +def handle_users( + url: dict[str, Any], + _domain: str | None, + _method: str | None, + **kwargs: Any, +) -> Any: + """Handle Users. + + :param url: Incoming URL dictionary + :type url: dict + :param _domain: Incoming domain (it's not being used for this handler) + :type _domain: str + :param _method: Incoming request method (it's not being used for this handler) + :type _method: str + :param kwargs: kwargs + :return: final url for Users endpoint + """ + final_keys = path.join("/", *url["keys"]) if url["keys"] else "" + if "user_id" in kwargs and kwargs["user_id"] != "me": + url = url["base"][:-1] + "/" + "users" + "/" + kwargs["user_id"] + elif "user_id" in kwargs and kwargs["user_id"] == "me": + url = url["base"][:-1] + final_keys + else: + url = url["base"][:-1] + "/" + "users" + + return url diff --git a/tests/tests.py b/tests/tests.py index 54b115c..1f4e471 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -169,6 +169,31 @@ def test_put_domain_creds(self) -> None: self.assertEqual(request.status_code, 200) self.assertIn("message", request.json()) + @pytest.mark.order(2) + def test_put_mailboxes_credentials(self) -> None: + """Test to update Mailgun SMTP credentials: Happy Path with valid data.""" + self.client.domains_credentials.create( + domain=self.domain, + data=self.post_domain_creds, + ) + name = "alice_bob" + req = self.client.mailboxes.put(domain=self.domain, login=f"{name}@{self.domain}") + + expected_keys = [ + "message", + "note", + "credentials", + ] + expected_credentials_keys = [ + f"{name}@{self.domain}", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json().keys()] # type: ignore[func-returns-value] + self.assertIn("Password changed", req.json()["message"]) + [self.assertIn(key, expected_credentials_keys) for key in req.json()["credentials"]] # type: ignore[func-returns-value] + @pytest.mark.order(3) def test_get_domain_list(self) -> None: req = self.client.domainlist.get() @@ -850,6 +875,7 @@ def setUp(self) -> None: ) self.client: Client = Client(auth=self.auth) self.domain: str = os.environ["DOMAIN"] + self.sender: str = os.environ["MESSAGES_FROM"] self.routes_data: dict[str, int | str | list[str]] = { "priority": 0, "description": "Sample route", @@ -864,7 +890,7 @@ def setUp(self) -> None: "priority": 2, } - # 'Routes quota (1) is exceeded for a free plan + # 'Routes quota (1) is exceeded for a free plan' def test_routes_create(self) -> None: params = {"skip": 0, "limit": 1} req1 = self.client.routes.get(domain=self.domain, filters=params) @@ -979,6 +1005,21 @@ def test_routes_delete(self) -> None: self.assertEqual(req.status_code, 200) self.assertIn("message", req.json()) + def test_get_routes_match(self) -> None: + """Test to match address to route: Happy Path with valid data.""" + + query = {"address": self.sender} + req = self.client.routes_match.get(domain=self.domain, filters=query) + + self.assertEqual(req.status_code, 200) + self.assertIn("route", req.json()) + + expected_keys = ["actions", "created_at", "description", "expression", "id", "priority"] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()["route"].keys()] # type: ignore[func-returns-value] + class WebhooksTests(unittest.TestCase): """Tests for Mailgun Webhooks API. @@ -1412,6 +1453,44 @@ def test_delete_version_template(self) -> None: self.assertEqual(req.status_code, 200) + def test_update_template_version_copy(self) -> None: + """Test to copy an existing version into a new version with the provided name: Happy Path with valid data.""" + data = {"comment": "An updated version comment"} + + req = self.client.templates.put( + domain=self.domain, + filters=data, + template_name="template.name1", + versions=True, + tag="v2", + copy=True, + new_tag="v3", + ) + + expected_keys = [ + "message", + "version", + "template", + ] + expected_template_keys = [ + "tag", + "template", + "engine", + "mjml", + "createdAt", + "comment", + "active", + "id", + "headers", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json().keys()] # type: ignore[func-returns-value] + self.assertIn("tag", req.json()["version"]) + self.assertIn("version has been copied", req.json()["message"]) + [self.assertIn(key, expected_template_keys) for key in req.json()["template"]] # type: ignore[func-returns-value] + @pytest.mark.skip( "Email Validation is only available through Mailgun paid plans, see https://www.mailgun.com/pricing/" @@ -2073,6 +2152,157 @@ def test_get_account_tag_incorrect_url_without_limits_part(self) -> None: self.assertIn("not found", req.json()["error"]) +class UsersTests(unittest.TestCase): + """Tests for Mailgun Users API, https://api.mailgun.net/v5/users. + + This class provides setup functionality for tests involving + with authentication and client initialization handled + in `setUp`. Each test in this suite operates with the configured Mailgun client + instance to simulate API interactions. + + """ + + def setUp(self) -> None: + self.auth: tuple[str, str] = ( + "api", + os.environ["APIKEY"], + ) + self.secret: tuple[str, str] = ( + "api", + os.environ["SECRET"], + ) + self.client: Client = Client(auth=self.auth) + self.client_with_secret_key = Client(auth=self.secret) + self.domain: str = os.environ["DOMAIN"] + self.mailgun_email = os.environ["MAILGUN_EMAIL"] + + def test_get_users(self) -> None: + """Test to get account's users details: Happy Path with valid data.""" + query = {"role": "admin", "limit": "0", "skip": "0"} + req = self.client.users.get(filters=query) + + expected_keys = [ + "total", + "users", + ] + + expected_users_keys = [ + "account_id", + "activated", + "auth", + "email", + "email_details", + "github_user_id", + "id", + "is_disabled", + "is_master", + "metadata", + "migration_status", + "name", + "opened_ip", + "password_updated_at", + "preferences", + "role", + "salesforce_user_id", + "tfa_active", + "tfa_created_at", + "tfa_enabled", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_users_keys) for key in req.json()["users"][0]] # type: ignore[func-returns-value] + + def test_get_user_invalid_url(self) -> None: + """Test to get account's users details: expected failure with invalid URL.""" + query = {"role": "admin", "limit": "0", "skip": "0"} + + with self.assertRaises(KeyError) as cm: + self.client.user.get(filters=query) + + @pytest.mark.xfail + def test_own_user_details(self) -> None: + req = self.client_with_secret_key.users.get(user_id="me") + + expected_users_keys = [ + "account_id", + "activated", + "auth", + "email", + "email_details", + "github_user_id", + "id", + "is_disabled", + "is_master", + "metadata", + "migration_status", + "name", + "opened_ip", + "password_updated_at", + "preferences", + "role", + "salesforce_user_id", + "tfa_active", + "tfa_created_at", + "tfa_enabled", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_users_keys) for key in req.json()] # type: ignore[func-returns-value] + + def test_get_user_details(self) -> None: + """Test to get account's users details: happy path.""" + query = {"role": "admin", "limit": "0", "skip": "0"} + req1 = self.client.users.get(filters=query) + users = req1.json()["users"] + + for user in users: + if self.mailgun_email == user["email"]: + req2 = self.client.users.get(user_id=user["id"]) + + expected_users_keys = [ + "account_id", + "activated", + "auth", + "email", + "email_details", + "github_user_id", + "id", + "is_disabled", + "is_master", + "metadata", + "migration_status", + "name", + "opened_ip", + "password_updated_at", + "preferences", + "role", + "salesforce_user_id", + "tfa_active", + "tfa_created_at", + "tfa_enabled", + ] + + self.assertIsInstance(req2.json(), dict) + self.assertEqual(req2.status_code, 200) + [self.assertIn(key, expected_users_keys) for key in req2.json()] # type: ignore[func-returns-value] + break + + def test_get_invalid_user_details(self) -> None: + """Test to get user details: expected failure with invalid user_id.""" + query = {"role": "admin", "limit": "0", "skip": "0"} + req1 = self.client.users.get(filters=query) + users = req1.json()["users"] + + for user in users: + if self.mailgun_email == user["email"]: + req2 = self.client.users.get(user_id="xxxxxxx") + + self.assertIsInstance(req2.json(), dict) + self.assertEqual(req2.status_code, 404) + # ============================================================================ # Async Test Classes (using AsyncClient and AsyncEndpoint) # ============================================================================ @@ -2212,6 +2442,33 @@ async def test_put_domain_creds(self) -> None: self.assertEqual(request.status_code, 200) self.assertIn("message", request.json()) + @pytest.mark.order(2) + async def test_put_mailboxes_credentials(self) -> None: + """Test to update Mailgun SMTP credentials: Happy Path with valid data.""" + await self.client.domains_credentials.create( + domain=self.domain, + data=self.post_domain_creds, + ) + name = "alice_bob" + req = await self.client.mailboxes.put(domain=self.domain, login=f"{name}@{self.domain}") + print(req) + print(req.json()) + + expected_keys = [ + "message", + "note", + "credentials", + ] + expected_credentials_keys = [ + f"{name}@{self.domain}", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json().keys()] # type: ignore[func-returns-value] + self.assertIn("Password changed", req.json()["message"]) + [self.assertIn(key, expected_credentials_keys) for key in req.json()["credentials"]] # type: ignore[func-returns-value] + @pytest.mark.order(3) async def test_get_domain_list(self) -> None: req = await self.client.domainlist.get() @@ -2861,6 +3118,7 @@ async def asyncSetUp(self) -> None: ) self.client: AsyncClient = AsyncClient(auth=self.auth) self.domain: str = os.environ["DOMAIN"] + self.sender: str = os.environ["MESSAGES_FROM"] self.routes_data: dict[str, int | str | list[str]] = { "priority": 0, "description": "Sample route", @@ -2979,6 +3237,33 @@ async def test_routes_delete(self) -> None: self.assertEqual(req.status_code, 200) self.assertIn("message", req.json()) + async def test_get_routes_match(self) -> None: + """Test to match address to route: Happy Path with valid data.""" + params = {"skip": 0, "limit": 1} + query = {"address": self.sender} + req1 = await self.client.routes.get(domain=self.domain, filters=params) + print('len(req1.json()["items"]): ', len(req1.json()["items"])) + if len(req1.json()["items"]) > 0: + await self.client.routes.delete( + domain=self.domain, + route_id=req1.json()["items"][0]["id"], + ) + + await self.client.routes.create(domain=self.domain, data=self.routes_data) + req = await self.client.routes_match.get(domain=self.domain, filters=query) + else: + await self.client.routes.create(domain=self.domain, data=self.routes_data) + req = await self.client.routes_match.get(domain=self.domain, filters=query) + + self.assertEqual(req.status_code, 200) + self.assertIn("route", req.json()) + + expected_keys = ["actions", "created_at", "description", "expression", "id", "priority"] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()["route"].keys()] # type: ignore[func-returns-value] + class AsyncWebhooksTests(unittest.IsolatedAsyncioTestCase): """Async tests for Mailgun Webhooks API using AsyncClient.""" @@ -3402,6 +3687,53 @@ async def test_delete_version_template(self) -> None: self.assertEqual(req.status_code, 200) + async def test_update_template_version_copy(self) -> None: + """Test to copy an existing version into a new version with the provided name: Happy Path with valid data.""" + await self.client.templates.create(data=self.post_template_data, domain=self.domain) + + await self.client.templates.create( + data=self.post_template_version_data, + domain=self.domain, + template_name=self.post_template_data["name"], + versions=True, + ) + + data = {"comment": "An updated version comment"} + + req = await self.client.templates.put( + domain=self.domain, + filters=data, + template_name="template.name1", + versions=True, + tag="v2", + copy=True, + new_tag="v3", + ) + + expected_keys = [ + "message", + "version", + "template", + ] + expected_template_keys = [ + "tag", + "template", + "engine", + "mjml", + "createdAt", + "comment", + "active", + "id", + "headers", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json().keys()] # type: ignore[func-returns-value] + self.assertIn("tag", req.json()["version"]) + self.assertIn("version has been copied", req.json()["message"]) + [self.assertIn(key, expected_template_keys) for key in req.json()["template"]] # type: ignore[func-returns-value] + @pytest.mark.skip( "Email Validation is only available through Mailgun paid plans, see https://www.mailgun.com/pricing/" @@ -4041,6 +4373,158 @@ async def test_get_account_tag_incorrect_url_without_limits_part(self) -> None: self.assertIn("not found", req.json()["error"]) +class AsyncUsersTests(unittest.IsolatedAsyncioTestCase): + """Async tests for Mailgun Users API using AsyncClient.""" + + async def asyncSetUp(self) -> None: + self.auth: tuple[str, str] = ( + "api", + os.environ["APIKEY"], + ) + self.secret: tuple[str, str] = ( + "api", + os.environ["SECRET"], + ) + self.client: AsyncClient = AsyncClient(auth=self.auth) + self.client_with_secret_key: AsyncClient = AsyncClient(auth=self.secret) + self.domain: str = os.environ["DOMAIN"] + self.mailgun_email = os.environ["MAILGUN_EMAIL"] + + async def asyncTearDown(self) -> None: + await self.client.aclose() + + async def test_get_users(self) -> None: + """Test to get account's users: happy path.""" + query = {"role": "admin", "limit": "0", "skip": "0"} + req = await self.client.users.get(filters=query) + + expected_keys = [ + "total", + "users", + ] + + expected_users_keys = [ + "account_id", + "activated", + "auth", + "email", + "email_details", + "github_user_id", + "id", + "is_disabled", + "is_master", + "metadata", + "migration_status", + "name", + "opened_ip", + "password_updated_at", + "preferences", + "role", + "salesforce_user_id", + "tfa_active", + "tfa_created_at", + "tfa_enabled", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_users_keys) for key in req.json()["users"][0]] # type: ignore[func-returns-value] + + async def test_get_user_invalid_url(self) -> None: + """Test to get account's users details: expected failure with invalid URL.""" + query = {"role": "admin", "limit": "0", "skip": "0"} + + with self.assertRaises(KeyError) as cm: + await self.client.user.get(filters=query) + + @pytest.mark.xfail + async def test_own_user_details(self) -> None: + req = await self.client_with_secret_key.users.get(user_id="me") + + expected_users_keys = [ + "account_id", + "activated", + "auth", + "email", + "email_details", + "github_user_id", + "id", + "is_disabled", + "is_master", + "metadata", + "migration_status", + "name", + "opened_ip", + "password_updated_at", + "preferences", + "role", + "salesforce_user_id", + "tfa_active", + "tfa_created_at", + "tfa_enabled", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_users_keys) for key in req.json()] # type: ignore[func-returns-value] + + async def test_get_user_details(self) -> None: + """Test to get user details: happy path.""" + """ + GET /v5/users/{user_id} + :return: + """ + query = {"role": "admin", "limit": "0", "skip": "0"} + req1 = await self.client.users.get(filters=query) + users = req1.json()["users"] + + for user in users: + if self.mailgun_email == user["email"]: + req2 = await self.client.users.get(user_id=user["id"]) + + expected_users_keys = [ + "account_id", + "activated", + "auth", + "email", + "email_details", + "github_user_id", + "id", + "is_disabled", + "is_master", + "metadata", + "migration_status", + "name", + "opened_ip", + "password_updated_at", + "preferences", + "role", + "salesforce_user_id", + "tfa_active", + "tfa_created_at", + "tfa_enabled", + ] + + self.assertIsInstance(req2.json(), dict) + self.assertEqual(req2.status_code, 200) + [self.assertIn(key, expected_users_keys) for key in req2.json()] # type: ignore[func-returns-value] + break + + async def test_get_invalid_user_details(self) -> None: + """Test to get user details: expected failure with invalid user_id.""" + query = {"role": "admin", "limit": "0", "skip": "0"} + req1 = await self.client.users.get(filters=query) + users = req1.json()["users"] + + for user in users: + if self.mailgun_email == user["email"]: + req2 = await self.client.users.get(user_id="xxxxxxx") + + self.assertIsInstance(req2.json(), dict) + self.assertEqual(req2.status_code, 404) + + class BounceClassificationTests(unittest.TestCase): """Tests for Mailgun Bounce Classification API, https://api.mailgun.net/v2/bounce-classification/metrics.