[NEW] add hytale.py - manage hytale server behaviour
This commit is contained in:
@@ -0,0 +1,152 @@
|
||||
#!/usr/bin/python3
|
||||
import os
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
import json
|
||||
import requests
|
||||
from requests.packages.urllib3.exceptions import InsecureRequestWarning # pyright: ignore[reportMissingImports]
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
#DONE nettoyer et faire un script propre /main /fonctions /nommage
|
||||
#DONE Kill le serveur proprement (fonction) --parser
|
||||
|
||||
#globalconf
|
||||
base_post_url = "https://oauth.accounts.hytale.com/oauth2"
|
||||
headers = {"Content-Type":"application/x-www-form-urlencoded","User-Agent":"curl/7.81.0"}
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.resolve()
|
||||
USER = os.environ["USER"]
|
||||
|
||||
# Change le répertoire de travail courant pour /opt/hytale/Server
|
||||
os.chdir(f'{SCRIPT_DIR}/Server')
|
||||
|
||||
def hytale_rest_post(url: str, data1=None):
|
||||
response = requests.post(base_post_url + url,headers=headers,data=data1,verify=True)
|
||||
return response
|
||||
|
||||
def hytale_rest_get(url: str, token=None):
|
||||
response = requests.get(url,headers={"Content-Type":"application/x-www-form-urlencoded","User-Agent":"curl/7.81.0", "Authorization": f"Bearer {token}"},verify=True)
|
||||
return response
|
||||
|
||||
def launch_server(credentials_file):
|
||||
if os.path.exists(f"{SCRIPT_DIR}/{credentials_file}"):
|
||||
with open(f"{SCRIPT_DIR}/{credentials_file}", 'r', encoding='utf-8') as f:
|
||||
my_file_data = json.load(f)
|
||||
else:
|
||||
print("No credentials file found, please generate credentials first")
|
||||
return 102
|
||||
|
||||
identity_token = my_file_data["identityToken"]
|
||||
session_token = my_file_data["sessionToken"]
|
||||
|
||||
# Lauch the server
|
||||
launch_status = subprocess.run(["bash", "-c", f"nohup java -jar HytaleServer.jar --auth-mode authenticated --session-token {session_token} --identity-token {identity_token} --assets ../Assets.zip --backup --backup-dir backups --backup-frequency 30 --allow-op --event-debug --ignore-broken-mods &> /var/log/hytale.log &"])
|
||||
|
||||
if launch_status.returncode == 0:
|
||||
print("Server launched successfully")
|
||||
return True
|
||||
|
||||
def kill_server():
|
||||
# Utilise pgrep pour trouver les processus java qui contiennent "HytaleServer.jar" dans leur ligne de commande
|
||||
result = subprocess.run(["pgrep", "-f", "java.*HytaleServer.jar.*"], capture_output=True, text=True)
|
||||
pids = result.stdout.split("\n")
|
||||
|
||||
# Tuer chaque processus trouvé
|
||||
for pid in pids :
|
||||
if pid == "" :
|
||||
pass
|
||||
else :
|
||||
os.kill(int(pid),9)
|
||||
print(f"{pid} killed" )
|
||||
|
||||
return True
|
||||
|
||||
def generate_credentials(credentials_file):
|
||||
print("Generating credentials...")
|
||||
# Step 1: Request Device Code
|
||||
device_code_response = hytale_rest_post("/device/auth",{"client_id": "hytale-server","scope": "openid offline auth:server"})
|
||||
my_device_code = device_code_response.json().get('device_code')
|
||||
|
||||
# Step 2: Poll for Token
|
||||
authentication_response = hytale_rest_post("/token",{"client_id":"hytale-server","grant_type":"urn:ietf:params:oauth:grant-type:device_code","device_code": my_device_code})
|
||||
print('Click on the link to approve the app: ' + device_code_response.json().get('verification_uri_complete'))
|
||||
|
||||
while authentication_response.status_code != 200:
|
||||
input("Approve the app and press enter to continue...")
|
||||
authentication_response = hytale_rest_post("/token",{"client_id":"hytale-server","grant_type":"urn:ietf:params:oauth:grant-type:device_code","device_code": my_device_code})
|
||||
|
||||
# Step 3: Get Profile Information
|
||||
my_token = authentication_response.json().get("access_token")
|
||||
my_profile = hytale_rest_get("https://account-data.hytale.com/my-account/get-profiles",my_token)
|
||||
my_uuid = my_profile.json().get("profiles")[0].get("uuid")
|
||||
|
||||
# Step 4: Create Game Session
|
||||
games_token_response = requests.post("https://sessions.hytale.com/game-session/new", headers={"Content-Type":"application/json","User-Agent":"curl/7.81.0", "Authorization": f"Bearer {my_token}"}, data=json.dumps({"uuid": my_uuid}))
|
||||
update_credentials(games_token_response.json(),credentials_file)
|
||||
|
||||
return True
|
||||
|
||||
def update_credentials(data: dict, credentials_file):
|
||||
if os.path.exists(f"{SCRIPT_DIR}/{credentials_file}"):
|
||||
with open(f"{SCRIPT_DIR}/{credentials_file}", 'r', encoding='utf-8') as f:
|
||||
my_file_data = json.load(f)
|
||||
else:
|
||||
my_file_data = {}
|
||||
|
||||
# Mise à jour des valeurs si elles sont présentes dans data
|
||||
if 'identityToken' in data:
|
||||
my_file_data["identityToken"] = data['identityToken']
|
||||
if 'sessionToken' in data:
|
||||
my_file_data["sessionToken"] = data['sessionToken']
|
||||
|
||||
# Écriture dans le fichier
|
||||
with open(f"{SCRIPT_DIR}/{credentials_file}", 'w', encoding='utf-8') as f:
|
||||
json.dump(my_file_data, f, indent=4, ensure_ascii=False)
|
||||
|
||||
print(f"Fichier {credentials_file} mis à jour avec succès !")
|
||||
|
||||
def refresh_credentials(credentials_file) :
|
||||
if os.path.exists(f"{SCRIPT_DIR}/{credentials_file}"):
|
||||
with open(f"{SCRIPT_DIR}/{credentials_file}", 'r', encoding='utf-8') as f:
|
||||
my_file_data = json.load(f)
|
||||
refresh_session = requests.post("https://sessions.hytale.com/game-session/refresh",headers={"Content-Type":"application/json","User-Agent":"curl/7.81.0", "Authorization": f"Bearer {my_file_data["sessionToken"]}"})
|
||||
|
||||
if refresh_session.status_code == 200 :
|
||||
print ("Refresh session validated!")
|
||||
update_credentials(refresh_session.json(),credentials_file)
|
||||
|
||||
def new_game_session ():
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(description="Manage Hytale Server")
|
||||
parser.add_argument("-Rs", "--relaunch-server",action="store_true")
|
||||
parser.add_argument("-Ko","--kill-server",action="store_true")
|
||||
parser.add_argument("-Rt", "--refresh-game-token",action="store_true")
|
||||
parser.add_argument("-Nr", "--no-refresh",action="store_true")
|
||||
parser.add_argument("-C", "--credentials-file",default="default_credentials.json")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.relaunch_server :
|
||||
kill_server()
|
||||
print("Server has been killed properly, let's continue")
|
||||
|
||||
if args.kill_server :
|
||||
kill_server()
|
||||
print("Server has been killed properly, exit")
|
||||
sys.exit(0)
|
||||
|
||||
if args.refresh_game_token :
|
||||
refresh_credentials(args.credentials_file)
|
||||
sys.exit(0)
|
||||
|
||||
if not args.no_refresh:
|
||||
generate_credentials(args.credentials_file)
|
||||
|
||||
launch_server(args.credentials_file)
|
||||
|
||||
main()
|
||||
Reference in New Issue
Block a user