Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于直接获取鸽游服务器存档信息 #3

Closed
zxjwzn opened this issue Sep 2, 2023 · 2 comments
Closed

关于直接获取鸽游服务器存档信息 #3

zxjwzn opened this issue Sep 2, 2023 · 2 comments

Comments

@zxjwzn
Copy link

zxjwzn commented Sep 2, 2023

我记得有个仓库可以实现
https://github.com/7aGiven/PhigrosLibrary
里面有个py的目录可以获取数据,使用的时候要把difficulty.csv也下载下来
不过程序貌似有点问题,我个人修改了一下
这是PhigrosLibrary.py

import base64
from Crypto.Cipher import AES
from Crypto.Util import Padding
import io
import struct
import zipfile

levels = ["EZ", "HD", "IN", "AT"]
difficulty = {}

def getBool(num, index):
        return bool(num & 1 << index)

def print_hex(data):
    hex_data = ' '.join(format(byte, '02X') for byte in data)
    print(hex_data)

class ByteReader:
    def __init__(self, data:bytes):
        self.data = data
        self.position = 0
    
    def readVarShort(self):
        num = self.data[self.position]
        if  num < 128:
            self.position += 1
        else:
            self.position += 2
        return num
    
    def readString(self):
        length = self.data[self.position]
        self.position += length + 1
        return self.data[self.position-length:self.position].decode()
    
    def readScoreAcc(self):
        self.position += 8
        scoreAcc = struct.unpack("if", self.data[self.position-8:self.position])
        return {"score": scoreAcc[0], "acc": scoreAcc[1]}

    
    def readRecord(self, songId):
        end_position = self.position + self.data[self.position] + 1
        self.position += 1
        exists = self.data[self.position]
        self.position += 1
        fc = self.data[self.position]
        self.position += 1
        diff = difficulty[songId]
        records = []
        for level in range(len(diff)):
            if getBool(exists, level):
                scoreAcc = self.readScoreAcc()
                scoreAcc["level"] = levels[level]
                scoreAcc["fc"] = getBool(fc, level)
                scoreAcc["songId"] = songId
                scoreAcc["difficulty"] = diff[level]
                scoreAcc["rks"] = (scoreAcc["acc"]-55)/45
                scoreAcc["rks"] = float(scoreAcc["rks"]) * float(scoreAcc["rks"]) * float(scoreAcc["difficulty"])
                records.append(scoreAcc)
        self.position = end_position
        return records 

global_headers = {
    "X-LC-Id": "rAK3FfdieFob2Nn8Am",
    "X-LC-Key": "Qr9AEqtuoSVS3zeD6iVbM4ZC0AtkJcQ89tywVyi0",
    "User-Agent": "LeanCloud-CSharp-SDK/1.0.3",
    "Accept": "application/json"
}

async def readGameRecord(client, url):
    async with client.get(url) as response:
        result = await response.read()
    with zipfile.ZipFile(io.BytesIO(result)) as zip:
        with zip.open("gameRecord") as gameRecord_file:
            if gameRecord_file.read(1) != b"\x01":
                raise "版本号不正确,可能协议已更新。"
            return gameRecord_file.read()

key = base64.b64decode("6Jaa0qVAJZuXkZCLiOa/Ax5tIZVu+taKUN1V1nqwkks=")
iv = base64.b64decode("Kk/wisgNYwcAV8WVGMgyUw==")

def decrypt_gameRecord(gameRecord):
    gameRecord = AES.new(key, AES.MODE_CBC, iv).decrypt(gameRecord)
    #print(Padding.unpad(gameRecord, AES.block_size))
    return Padding.unpad(gameRecord, AES.block_size)

def parse_b19(gameRecord):
    records = []
    reader = ByteReader(gameRecord)
    #print_hex(gameRecord)
    for i in range(reader.readVarShort()):
        songId = reader.readString()[:-2]
        #print(songId)
        record = reader.readRecord(songId)
        records.extend(record)
    records.sort(key=lambda x:x["rks"], reverse=True)
    b19 = [max(filter(lambda x:x["score"] == 1000000, records), key=lambda x:x["difficulty"])]
    b19.extend(records[:19])
    return b19

class B19Class:
    def __init__(self, client):
        self.client = client
    
    def read_difficulty(self, path):
        difficulty.clear()
        with open(path, encoding="utf-8", errors="replace") as f:
            lines = f.readlines()
        for line in lines:
            line = line[:-1].split(",")
            #print(line)
            for i in range(1, len(line)):
                difficulty[line[0]]=line[1:]
        #print(difficulty)

    async def get_playerId(self, sessionToken):
        headers = global_headers.copy()
        headers["X-LC-Session"] = sessionToken
        async with self.client.get("https://rak3ffdi.cloud.tds1.tapapis.cn/1.1/users/me", headers=headers) as response:
            result = (await response.json())["nickname"]
        return result

    async def get_summary(self, sessionToken):
        headers = global_headers.copy()
        headers["X-LC-Session"] = sessionToken
        async with self.client.get("https://rak3ffdi.cloud.tds1.tapapis.cn/1.1/classes/_GameSave", headers=headers) as response:
            result = (await response.json())["results"][0]
        updateAt = result["updatedAt"]
        url = result["gameFile"]["url"]
        summary = base64.b64decode(result["summary"])
        summary = struct.unpack("=BHfBx%ds12H" % summary[8], summary)
        return {"updateAt": updateAt, "url": url, "saveVersion": summary[0], "challenge": summary[1], "rks": summary[2], "gameVersion": summary[3], "avatar": summary[4].decode(), "EZ": summary[5:8], "HD": summary[8:11], "IN": summary[11:14], "AT": summary[14:17]}
    
    async def get_b19(self, url):
        gameRecord = await readGameRecord(self.client, url)
        gameRecord = decrypt_gameRecord(gameRecord)
        return parse_b19(gameRecord)

这是test.py

import aiohttp
import asyncio
import sys
from PhigrosLibrary import B19Class

async def main(sessionToken):
    async with aiohttp.ClientSession() as client:
        b19Class = B19Class(client)
        b19Class.read_difficulty("../difficulty.csv")
        playerId = await b19Class.get_playerId(sessionToken)        
        summary = await b19Class.get_summary(sessionToken)
        print("玩家名称:" + playerId)
        print("存档上传日期:" + summary["updateAt"])
        print("存档地址:" + summary["url"])
        #print(difficulty)
        b19 = await b19Class.get_b19(summary["url"])
    rks = 0
    b19 = sorted(b19, key=lambda x: x["score"], reverse=True)
    for song in b19:
        rks += song["rks"]
        print("名称:" + song["songId"] + "\n分数:" + str(song["score"]) + "\n准度:" + str(round(song["acc"], 2)) + "%" + "\n难度:" +song["level"] + "\n定数:" + song["difficulty"] + "\n单曲rks:" + str(round(song["rks"], 2)) + "\n-------------")
    print(rks/20)

s = ""
if len(sys.argv) == 2:
    s = sys.argv[1]
asyncio.run(main(s))

s里填的是25位的token,这样就可以直接往鸽游服务器上获取数据
token的获取可以参考以下文章里的账号绑定内容
https://mivik.moe/pgr-bot-help/

@zxjwzn zxjwzn closed this as completed Sep 2, 2023
@zxjwzn
Copy link
Author

zxjwzn commented Sep 2, 2023

我个人没什么能力,加上机器人账号被封,所以心有余力不足了
希望作者可以整合一下这个功能

@zxjwzn zxjwzn reopened this Sep 2, 2023
@SEVEN-6174
Copy link
Owner

十分感谢
之前一直想着获取数据的结果没办法

@zxjwzn zxjwzn closed this as completed Feb 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants