-
Notifications
You must be signed in to change notification settings - Fork 0
/
Steam_login.py
133 lines (113 loc) · 5.43 KB
/
Steam_login.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import base64
import requests
from steampy import guard
import rsa
import json
class LoginExecutor:
def __init__(self, username: str, password: str, shared_secret: str, session: requests.Session = requests.Session()) -> None:
self.username = username
self.password = password
self.shared_secret = shared_secret
self.session = session
self.client_id = ''
self.steamid = ''
self.request_id = ''
self.refresh_token = ''
def login(self) -> requests.Session:
login_response = self._send_login_request() # 创建登录会话
self._update_stem_guard(login_response)
self._pool_sessions_steam()
finallized_response = self._finallez_login()
self._set_tokens(finallized_response)
self.set_sessionid_cookies()
return self.session
def _send_login_request(self) -> requests.Response:
rsa_params = self._fetch_rsa_params()
data = {
'persistence': "1",
'encrypted_password': rsa_params['encrypted_password'],
'account_name': self.username,
'encryption_timestamp': rsa_params['rsa_timestamp'],
}
response = self.session.post("https://api.steampowered.com/IAuthenticationService/BeginAuthSessionViaCredentials/v1", data=data)
return response
def set_sessionid_cookies(self):
sessionid = self.session.cookies.get_dict().get('sessionid')
community_cookie = {"name": "sessionid",
"value": sessionid,
"domain": 'steamcommunity.com'}
self.session.cookies.set(**community_cookie)
store_cookie = {"name": "sessionid",
"value": sessionid,
"domain": 'store.steampowered.com'}
self.session.cookies.set(**store_cookie)
def _fetch_rsa_params(self, retry: int = 3) -> dict:
self.session.post("https://steamcommunity.com") # 获得第一个Cookies
response = self.session.get("https://api.steampowered.com/IAuthenticationService/GetPasswordRSAPublicKey/v1/?account_name=" + self.username)
key_response = json.loads(response.text)
for i in range(retry):
try:
rsa_mod = int(key_response["response"]['publickey_mod'], 16)
rsa_exp = int(key_response["response"]['publickey_exp'], 16)
rsa_timestamp = key_response["response"]['timestamp']
rsa_key = rsa.PublicKey(rsa_mod, rsa_exp)
encrypted_password = base64.b64encode(rsa.encrypt(self.password.encode('utf-8'), rsa_key))
return {'encrypted_password': encrypted_password,
'rsa_timestamp': rsa_timestamp}
except KeyError:
if retry >= 2:
raise ValueError('Could not obtain rsa-key')
def _update_stem_guard(self, login_response):
response_json = json.loads(login_response.text).get('response')
self.client_id = response_json.get('client_id')
self.steamid = response_json.get('steamid')
self.request_id = response_json.get('request_id')
code = guard.generate_one_time_code(self.shared_secret)
update_data = {
'client_id': self.client_id,
'steamid': self.steamid,
'code_type': 3,
'code': code
}
response = self.session.post("https://api.steampowered.com/IAuthenticationService/UpdateAuthSessionWithSteamGuardCode/v1/", data=update_data)
def _pool_sessions_steam(self):
pool_data = {
'client_id': self.client_id,
'request_id': self.request_id
}
response = self.session.post("https://api.steampowered.com//IAuthenticationService/PollAuthSessionStatus/v1/", data=pool_data)
response_json = json.loads(response.text)
self.refresh_token = response_json.get("response").get("refresh_token")
def _finallez_login(self) -> requests.Response:
try:
sessionid = self.session.cookies.get_dict().get('sessionid')
except Exception:
print("获取sessionid失败", self.session.cookies.get_dict())
raise Exception("Steam_login.py , 获取sessionid失败")
else:
redir = "https://steamcommunity.com/login/home/?goto="
finallez_data = {
'nonce': self.refresh_token,
'sessionid': sessionid,
'redir': redir
}
response = self.session.post("https://login.steampowered.com/jwt/finalizelogin", data=finallez_data)
return response
def _set_tokens(self, fin_resp: requests.Response):
response_json = json.loads(fin_resp.text)
transfer_info = response_json.get("transfer_info")
for item in transfer_info[:2]:
params = item.get('params')
data = {
'nonce': params.get("nonce"),
'auth': params.get("auth"),
'steamID': self.steamid
}
response = self.session.post(item.get('url'), data=data)
def test_is_logined(self):
resp = self.session.get(f"https://steamcommunity.com/profiles/{self.steamid}/home")
print(resp.text)
if __name__ == '__main__':
login = LoginExecutor('', '', '=')
session = login.login()
login.test_is_logined()