Skip to content

Commit e1e6c62

Browse files
Merge pull request #55 from infinityofspace/support_dnssec_api
Support dnssec api
2 parents 3eaffc8 + a08b4e5 commit e1e6c62

File tree

3 files changed

+303
-6
lines changed

3 files changed

+303
-6
lines changed

pkb_client/client/client.py

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,20 @@
22
import logging
33
from hashlib import sha256
44
from pathlib import Path
5-
from typing import Optional, List, Union
5+
from typing import List, Optional, Union
66
from urllib.parse import urljoin
77

88
import dns.resolver
99
import requests
1010

1111
from pkb_client.client import BindFile
1212
from pkb_client.client.dns import (
13+
DNS_RECORDS_WITH_PRIORITY,
1314
DNSRecord,
14-
DNSRestoreMode,
1515
DNSRecordType,
16-
DNS_RECORDS_WITH_PRIORITY,
16+
DNSRestoreMode,
1717
)
18+
from pkb_client.client.dnssec import DNSSECRecord
1819
from pkb_client.client.domain import DomainInfo
1920
from pkb_client.client.forwarding import URLForwarding, URLForwardingType
2021
from pkb_client.client.ssl_cert import SSLCertBundle
@@ -842,6 +843,112 @@ def get_ssl_bundle(self, domain) -> SSLCertBundle:
842843
response_json.get("message", "Unknown message"),
843844
)
844845

846+
def get_dnssec_records(self, domain: str) -> List[DNSSECRecord]:
847+
"""
848+
API DNSSEC retrieve method: retrieve all DNSSEC records for the given domain.
849+
See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Get%20Records for more info.
850+
851+
:param domain: the domain for which the DNSSEC records should be retrieved
852+
:return: list of :class:`DNSSECRecord` objects
853+
"""
854+
855+
url = urljoin(self.api_endpoint, f"dns/getDnssecRecords/{domain}")
856+
req_json = self._get_auth_request_json()
857+
r = requests.post(url=url, json=req_json)
858+
859+
if r.status_code == 200:
860+
return [
861+
DNSSECRecord.from_dict(record)
862+
for record in json.loads(r.text).get("records", {}).values()
863+
]
864+
else:
865+
response_json = json.loads(r.text)
866+
raise PKBClientException(
867+
response_json.get("status", "Unknown status"),
868+
response_json.get("message", "Unknown message"),
869+
)
870+
871+
def create_dnssec_record(
872+
self,
873+
domain: str,
874+
key_tag: int,
875+
alg: int,
876+
digest_type: int,
877+
digest: str,
878+
max_sig_life: Optional[int] = None,
879+
key_data_flags: Optional[int] = None,
880+
key_data_protocol: Optional[int] = None,
881+
key_data_algo: Optional[int] = None,
882+
key_data_pub_key: Optional[str] = None,
883+
) -> bool:
884+
"""
885+
API DNSSEC create method: create a new DNSSEC record for the given domain.
886+
See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Create%20Record for more info.
887+
888+
:param domain: the domain for which the DNSSEC record should be created
889+
:param key_tag: the key tag of the DNSSEC record
890+
:param alg: algorithm of the DNSSEC record
891+
:param digest_type: digest type of the DNSSEC record
892+
:param digest: digest of the DNSSEC record
893+
:param max_sig_life: maximum signature life of the DNSSEC record in seconds
894+
:param key_data_flags: key data flags of the DNSSEC record
895+
:param key_data_protocol: key data protocol of the DNSSEC record
896+
:param key_data_algo: key data algorithm of the DNSSEC record
897+
:param key_data_pub_key: key data public key of the DNSSEC record
898+
899+
:return: True if everything went well
900+
"""
901+
902+
if max_sig_life is not None and max_sig_life < 0:
903+
raise ValueError("max_sig_life must be greater than 0")
904+
905+
url = urljoin(self.api_endpoint, f"dns/createDnssecRecord/{domain}")
906+
req_json = {
907+
**self._get_auth_request_json(),
908+
"keyTag": key_tag,
909+
"alg": alg,
910+
"digestType": digest_type,
911+
"digest": digest,
912+
"maxSigLife": max_sig_life,
913+
"keyDataFlags": key_data_flags,
914+
"keyDataProtocol": key_data_protocol,
915+
"keyDataAlgo": key_data_algo,
916+
"keyDataPubKey": key_data_pub_key,
917+
}
918+
r = requests.post(url=url, json=req_json)
919+
920+
if r.status_code == 200:
921+
return True
922+
else:
923+
response_json = json.loads(r.text)
924+
raise PKBClientException(
925+
response_json.get("status", "Unknown status"),
926+
response_json.get("message", "Unknown message"),
927+
)
928+
929+
def delete_dnssec_record(self, domain: str, key_tag: int) -> bool:
930+
"""
931+
API DNSSEC delete method: delete an existing DNSSEC record for the given domain.
932+
See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Delete%20Record for more info.
933+
934+
:param domain: the domain for which the DNSSEC record should be deleted
935+
:param key_tag: the key tag of the DNSSEC record
936+
:return: True if everything went well
937+
"""
938+
939+
url = urljoin(self.api_endpoint, f"dns/deleteDnssecRecord/{domain}/{key_tag}")
940+
req_json = self._get_auth_request_json()
941+
r = requests.post(url=url, json=req_json)
942+
943+
if r.status_code == 200:
944+
return True
945+
else:
946+
response_json = json.loads(r.text)
947+
raise PKBClientException(
948+
response_json.get("status", "Unknown status"),
949+
response_json.get("message", "Unknown message"),
950+
)
951+
845952
@staticmethod
846953
def __handle_error_backup__(dns_records):
847954
# merge the single DNS records into one single dict with the record id as key

pkb_client/client/dnssec.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
5+
@dataclass
6+
class DNSSECRecord:
7+
key_tag: int # The key tag is a 16-bit integer that identifies the DNSKEY record
8+
alg: int # Indicates the algorithm used to generate the public key
9+
digest_type: int # Indicates the type of digest algorithm used
10+
digest: str # The digest of the public key
11+
max_sig_life: Optional[
12+
int
13+
] # Indicates the amount of time in seconds the signature is valid
14+
key_data_flags: Optional[
15+
int
16+
] # Indicates the key type (Zone-signing or Key-signing)
17+
key_data_protocol: Optional[int] # Indicates the protocol used for the key
18+
key_data_algo: Optional[int] # Indicates the algorithm used for the key
19+
key_data_pub_key: Optional[str] # The public key in base64 format
20+
21+
@staticmethod
22+
def from_dict(d):
23+
return DNSSECRecord(
24+
key_tag=int(d["keyTag"]),
25+
alg=int(d["alg"]),
26+
digest_type=int(d["digestType"]),
27+
digest=d["digest"],
28+
max_sig_life=int(d["maxSigLife"]) if "maxSigLife" in d else None,
29+
key_data_flags=int(d["keyDataFlags"]) if "keyDataFlags" in d else None,
30+
key_data_protocol=int(d["keyDataProtocol"])
31+
if "keyDataProtocol" in d
32+
else None,
33+
key_data_algo=int(d["keyDataAlgo"]) if "keyDataAlgo" in d else None,
34+
key_data_pub_key=d["keyDataPubKey"] if "keyDataPubKey" in d else None,
35+
)

tests/client.py

Lines changed: 158 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99
from responses.registries import OrderedRegistry
1010

1111
from pkb_client.client import (
12-
PKBClient,
13-
PKBClientException,
1412
API_ENDPOINT,
1513
DNSRestoreMode,
14+
PKBClient,
15+
PKBClientException,
16+
SSLCertBundle,
1617
)
17-
from pkb_client.client import SSLCertBundle
1818
from pkb_client.client.dns import DNSRecord, DNSRecordType
19+
from pkb_client.client.dnssec import DNSSECRecord
1920
from pkb_client.client.forwarding import URLForwarding, URLForwardingType
2021

2122

@@ -1020,6 +1021,160 @@ def test_import_bind_dns_records(self):
10201021

10211022
pkb_client.import_bind_dns_records(filename, DNSRestoreMode.clear)
10221023

1024+
@responses.activate
1025+
def test_get_dnssec_records(self):
1026+
pkb_client = PKBClient("key", "secret")
1027+
1028+
responses.post(
1029+
url=urljoin(API_ENDPOINT, "dns/getDnssecRecords/example.com"),
1030+
json={
1031+
"status": "SUCCESS",
1032+
"records": {
1033+
"12345": {
1034+
"keyTag": "12345",
1035+
"alg": "8",
1036+
"digestType": "1",
1037+
"digest": "abc123",
1038+
},
1039+
"12346": {
1040+
"keyTag": "12346",
1041+
"alg": "8",
1042+
"digestType": "1",
1043+
"digest": "abc456",
1044+
"maxSigLife": 3600,
1045+
"keyDataFlags": 257,
1046+
"keyDataProtocol": 3,
1047+
"keyDataAlgo": 8,
1048+
"keyDataPubKey": "abc789",
1049+
},
1050+
},
1051+
},
1052+
match=[
1053+
matchers.json_params_matcher(
1054+
{"apikey": "key", "secretapikey": "secret"}
1055+
)
1056+
],
1057+
)
1058+
dnssec_records = pkb_client.get_dnssec_records("example.com")
1059+
1060+
self.assertEqual(2, len(dnssec_records))
1061+
self.assertEqual(
1062+
DNSSECRecord(
1063+
key_tag=12345,
1064+
alg=8,
1065+
digest_type=1,
1066+
digest="abc123",
1067+
max_sig_life=None,
1068+
key_data_flags=None,
1069+
key_data_protocol=None,
1070+
key_data_algo=None,
1071+
key_data_pub_key=None,
1072+
),
1073+
dnssec_records[0],
1074+
)
1075+
self.assertEqual(
1076+
DNSSECRecord(
1077+
key_tag=12346,
1078+
alg=8,
1079+
digest_type=1,
1080+
digest="abc456",
1081+
max_sig_life=3600,
1082+
key_data_flags=257,
1083+
key_data_protocol=3,
1084+
key_data_algo=8,
1085+
key_data_pub_key="abc789",
1086+
),
1087+
dnssec_records[1],
1088+
)
1089+
1090+
@responses.activate
1091+
def test_create_dnssec_record(self):
1092+
pkb_client = PKBClient("key", "secret")
1093+
1094+
responses.post(
1095+
url=urljoin(API_ENDPOINT, "dns/createDnssecRecord/example.com"),
1096+
json={"status": "SUCCESS"},
1097+
match=[
1098+
matchers.json_params_matcher(
1099+
{
1100+
"apikey": "key",
1101+
"secretapikey": "secret",
1102+
"keyTag": 4242,
1103+
"alg": 12345,
1104+
"digestType": 8,
1105+
"digest": "abc123",
1106+
"maxSigLife": None,
1107+
"keyDataFlags": None,
1108+
"keyDataProtocol": None,
1109+
"keyDataAlgo": None,
1110+
"keyDataPubKey": None,
1111+
}
1112+
)
1113+
],
1114+
)
1115+
1116+
success = pkb_client.create_dnssec_record(
1117+
domain="example.com",
1118+
key_tag=4242,
1119+
alg=12345,
1120+
digest_type=8,
1121+
digest="abc123",
1122+
)
1123+
self.assertTrue(success)
1124+
1125+
responses.post(
1126+
url=urljoin(API_ENDPOINT, "dns/createDnssecRecord/example2.com"),
1127+
json={"status": "SUCCESS"},
1128+
match=[
1129+
matchers.json_params_matcher(
1130+
{
1131+
"apikey": "key",
1132+
"secretapikey": "secret",
1133+
"keyTag": 4242,
1134+
"alg": 12345,
1135+
"digestType": 8,
1136+
"digest": "abc123",
1137+
"maxSigLife": 42,
1138+
"keyDataFlags": 41,
1139+
"keyDataProtocol": 40,
1140+
"keyDataAlgo": 39,
1141+
"keyDataPubKey": "abc42",
1142+
}
1143+
)
1144+
],
1145+
)
1146+
1147+
success = pkb_client.create_dnssec_record(
1148+
domain="example2.com",
1149+
key_tag=4242,
1150+
alg=12345,
1151+
digest_type=8,
1152+
digest="abc123",
1153+
max_sig_life=42,
1154+
key_data_flags=41,
1155+
key_data_protocol=40,
1156+
key_data_algo=39,
1157+
key_data_pub_key="abc42",
1158+
)
1159+
self.assertTrue(success)
1160+
1161+
@responses.activate
1162+
def delete_dnssec_record(self):
1163+
pkb_client = PKBClient("key", "secret")
1164+
1165+
responses.post(
1166+
url=urljoin(API_ENDPOINT, "dns/deleteDnssecRecord/example.com/123456"),
1167+
json={"status": "SUCCESS"},
1168+
match=[
1169+
matchers.json_params_matcher(
1170+
{"apikey": "key", "secretapikey": "secret"}
1171+
)
1172+
],
1173+
)
1174+
1175+
success = pkb_client.delete_dnssec_record("example.com", 123456)
1176+
self.assertTrue(success)
1177+
10231178

10241179
if __name__ == "__main__":
10251180
unittest.main()

0 commit comments

Comments
 (0)