Skip to content

Commit a8d27c1

Browse files
author
danny
committed
Waiting for anisette server. Using fallback for SMS2FA
1 parent 2c847ce commit a8d27c1

File tree

2 files changed

+77
-34
lines changed

2 files changed

+77
-34
lines changed

endpoint/mh_endpoint.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77
import ssl
8+
import sys
89
import time
910
from collections import OrderedDict
1011
from datetime import datetime, timezone
@@ -29,9 +30,9 @@ def addCORSHeaders(self):
2930
self.send_header("Access-Control-Allow-Private-Network","true")
3031

3132
def authenticate(self):
32-
user = config.getEndpointUser()
33-
passw = config.getEndpointPass()
34-
if (user is None or user == "") and (passw is None or passw == ""):
33+
endpoint_user = config.getEndpointUser()
34+
endpoint_pass = config.getEndpointPass()
35+
if (endpoint_user is None or endpoint_user == "") and (endpoint_pass is None or endpoint_pass == ""):
3536
return True
3637

3738
auth_header = self.headers.get('authorization')
@@ -40,7 +41,7 @@ def authenticate(self):
4041
if auth_type.lower() == 'basic':
4142
auth_decoded = base64.b64decode(auth_encoded).decode('utf-8')
4243
username, password = auth_decoded.split(':', 1)
43-
if username == user and password == passw:
44+
if username == endpoint_user and password == endpoint_pass:
4445
return True
4546

4647
return False
@@ -149,8 +150,26 @@ def getAuth(regenerate=False, second_factor='sms'):
149150
return j['dsid'], j['searchPartyToken']
150151

151152

153+
154+
def check_if_anisette_is_reachable(max_retries=3, retry_delay=10):
155+
server_url = config.getAnisetteServer()
156+
logging.info(f'Checking if Anisette {server_url} is reachable')
157+
for attempt in range(max_retries):
158+
try:
159+
response = requests.get(server_url, timeout=5)
160+
response.raise_for_status()
161+
return
162+
except (requests.RequestException, requests.HTTPError) as e:
163+
logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
164+
if attempt < max_retries - 1:
165+
logger.error(f"Retrying in {retry_delay} seconds...")
166+
time.sleep(retry_delay)
167+
logger.error(f"Max retries reached. Program will exit. Make sure your Anisette is reachable and start again with 'docker start -ai macless-haystack'")
168+
sys.exit()
169+
152170
if __name__ == "__main__":
153-
logging.debug(f'Searching for token at ' + config.getConfigFile())
171+
check_if_anisette_is_reachable()
172+
logging.info(f'Searching for token at ' + config.getConfigFile())
154173
if not os.path.exists(config.getConfigFile()):
155174
logging.info(f'No auth-token found.')
156175
apple_cryptography.registerDevice()

endpoint/register/pypush_gsa_icloud.py

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
import urllib3
24
from getpass import getpass
35
import plistlib as plist
@@ -37,6 +39,7 @@ def icloud_login_mobileme(username='', password=''):
3739
username = input('Apple ID: ')
3840
if not password:
3941
password = getpass('Password: ')
42+
4043
g = gsa_authenticate(username, password)
4144
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
4245
adsid = g["adsid"]
@@ -71,10 +74,10 @@ def icloud_login_mobileme(username='', password=''):
7174
def gsa_authenticate(username, password):
7275
# Password is None as we'll provide it later
7376
usr = srp.User(username, bytes(), hash_alg=srp.SHA256, ng_type=srp.NG_2048)
74-
_, A = usr.start_authentication()
77+
_, a = usr.start_authentication()
7578
logger.info("Authentication request initialization")
7679
r = gsa_authenticated_request(
77-
{"A2k": A, "ps": ["s2k", "s2k_fo"], "u": username, "o": "init"})
80+
{"A2k": a, "ps": ["s2k", "s2k_fo"], "u": username, "o": "init"})
7881

7982
if r["sp"] not in ["s2k", "s2k_fo"]:
8083
logger.warning(f"This implementation only supports s2k and sk2_fo. Server returned {r['sp']}")
@@ -219,15 +222,13 @@ def decrypt_cbc(usr, data):
219222
return padder.update(data) + padder.finalize()
220223

221224

225+
WAITING_TIME = 60
226+
227+
222228
def sms_second_factor(dsid, idms_token):
223229
identity_token = base64.b64encode(
224230
(dsid + ":" + idms_token).encode()).decode()
225231

226-
# TODO: Actually do this request to get user prompt data
227-
# a = requests.get("https://gsa.apple.com/auth", verify=False)
228-
# This request isn't strictly necessary though,
229-
# and most accounts should have their id 1 SMS, if not contribute ;)
230-
231232
headers = {
232233
"User-Agent": "Xcode",
233234
"Accept-Language": "en-us",
@@ -238,39 +239,46 @@ def sms_second_factor(dsid, idms_token):
238239
}
239240

240241
headers.update(generate_anisette_headers())
241-
242+
242243
# Extract the "boot_args" from the auth page to get the id of the trusted phone number
243244
pattern = r'<script.*class="boot_args">\s*(.*?)\s*</script>'
244245
auth = requests.get("https://gsa.apple.com/auth", headers=headers, verify=False)
245246
sms_id = 1
246247
match = re.search(pattern, auth.text, re.DOTALL)
247248
if match:
248249
boot_args = json.loads(match.group(1).strip())
249-
try:
250+
try:
250251
sms_id = boot_args["direct"]["phoneNumberVerification"]["trustedPhoneNumber"]["id"]
251-
except KeyError as e:
252-
logger.debug(match.group(1).strip())
253-
logger.error("Key for sms id not found. Using the first phone number")
252+
except KeyError as e:
253+
logger.debug(match.group(1).strip())
254+
logger.error("Key for sms id not found. Using the first phone number")
254255
else:
255-
logger.debug(auth.text)
256-
logger.error("Script for sms id not found. Using the first phone number")
257-
258-
logger.info(f"Using phone with id {sms_id} for SMS2FA")
259-
body = {"phoneNumber": {"id": sms_id }, "mode": "sms"}
260-
261-
# This will send the 2FA code to the user's phone over SMS
262-
# We don't care about the response, it's just some HTML with a form for entering the code
263-
# Easier to just use a text prompt
264-
requests.put(
265-
"https://gsa.apple.com/auth/verify/phone/",
266-
json=body,
267-
headers=headers,
268-
verify=False,
269-
timeout=5
270-
)
256+
logger.debug(auth.text)
257+
logger.error("Script for sms id not found. Using the first phone number")
271258

259+
logger.info(f"Using phone with id {sms_id} for SMS2FA")
260+
body = {"phoneNumber": {"id": sms_id}, "mode": "sms"}
261+
for handler in logger.handlers:
262+
handler.flush()
272263
# Prompt for the 2FA code. It's just a string like '123456', no dashes or spaces
273-
code = input("Enter SMS 2FA code: ")
264+
start_time = time.perf_counter()
265+
code = input(
266+
f"Enter SMS 2FA code (If you do not receive a code, wait {WAITING_TIME}s and press Enter. An attempt will be made to request the SMS in another way.): ")
267+
end_time = time.perf_counter()
268+
269+
if code == "":
270+
elapsed_time = int(end_time - start_time)
271+
if elapsed_time < WAITING_TIME:
272+
waiting_time = WAITING_TIME - elapsed_time
273+
logger.info(
274+
f"You only waited {elapsed_time} seconds. The next request will be started in {waiting_time} seconds")
275+
time.sleep(waiting_time)
276+
code = input(f"Enter SMS 2FA code if you have received it in the meantime, otherwise press Enter: ")
277+
278+
if code == "":
279+
code = request_code(headers)
280+
else:
281+
code = request_code(headers)
274282

275283
body['securityCode'] = {'code': code}
276284

@@ -294,3 +302,19 @@ def sms_second_factor(dsid, idms_token):
294302
else:
295303
raise Exception(
296304
"2FA unsuccessful. Maybe wrong code or wrong number. Check your account details.")
305+
306+
307+
def request_code(headers):
308+
# This will send the 2FA code to the user's phone over SMS
309+
# We don't care about the response, it's just some HTML with a form for entering the code
310+
# Easier to just use a text prompt
311+
body = {"phoneNumber": {"id": 1}, "mode": "sms"}
312+
requests.put(
313+
"https://gsa.apple.com/auth/verify/phone/",
314+
json=body,
315+
headers=headers,
316+
verify=False,
317+
timeout=5
318+
)
319+
code = input(f"Enter SMS 2FA code:")
320+
return code

0 commit comments

Comments
 (0)