diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..457d305 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.pth +*.onnx +*.pt +/__pycache__/* diff --git a/model.py b/model.py index 82b58a5..a24c099 100644 --- a/model.py +++ b/model.py @@ -1,12 +1,15 @@ import os +import math import torch import torch.nn as nn import numpy as np import random + BOARD_SIZE = 8 # 定义棋盘大小 WIN_CONDITION = 5 # 胜利条件 + # 游戏环境 class Gomoku: def __init__(self): @@ -14,11 +17,13 @@ def __init__(self): self.current_player = 1 self.winning_line = [] + def reset(self): self.board.fill(0) self.current_player = 1 self.winning_line = [] + def is_winning_move(self, x, y): # 检查五子连珠的胜利条件 def count_consecutive(player, dx, dy): @@ -33,6 +38,7 @@ def count_consecutive(player, dx, dy): break return count, line + player = self.board[x, y] directions = [(1, 0), (0, 1), (1, 1), (1, -1)] for dx, dy in directions: @@ -43,81 +49,85 @@ def count_consecutive(player, dx, dy): return True return False + def step(self, action): - # 解析动作坐标, 将传入的action转换为棋盘上的坐标 + # 解析动作坐标, 将传入的 action 转换为棋盘上的坐标 x, y = action // BOARD_SIZE, action % BOARD_SIZE # 检查目标位置是否已被占用 - if self.board[x, y] != 0: - return -1, True + if self.board[x, y]!= 0: + return -1, True, 0 # 落子 self.board[x, y] = self.current_player if self.is_winning_move(x, y): - return self.current_player, True + if self.current_player == 1: + return 1, True, 10000 # Player 1 五子连珠获胜 + else: + return 2, True, -10000 # Player 2 五子连珠获胜 - # 切换到另外一个棋手 1变2,2变1 + # 切换到另外一个棋手 1 变 2,2 变 1 self.current_player = 3 - self.current_player - # 中间奖励score机制 + # 中间奖励 score 机制 score = self.evaluate_board() - return score, False + return self.board[x, y], False, score - def evaluate_board(self): - score = 0 - directions = [(1, 0), (0, 1), (1, 1), (1, -1)] - def evaluate_line(player, x, y, dx, dy): - count = 1 - block = 0 - for step in range(1, WIN_CONDITION): - nx, ny = x + dx * step, y + dy * step - if 0 <= nx < BOARD_SIZE and 0 <= ny < BOARD_SIZE: - if self.board[nx, ny] == player: - count += 1 - elif self.board[nx, ny] == 0: - break - else: - block += 1 - break - else: - block += 1 - break - for step in range(1, WIN_CONDITION): - nx, ny = x - dx * step, y - dy * step - if 0 <= nx < BOARD_SIZE and 0 <= ny < BOARD_SIZE: - if self.board[nx, ny] == player: - count += 1 - elif self.board[nx, ny] == 0: - break - else: - block += 1 - break + def evaluate_board(self): + def count_consecutive(player, x, y, dx, dy): + """ + 计算在特定方向上玩家的连续棋子数 + :param player: 玩家编号(1 或 2) + :param x: 起始 x 坐标 + :param y: 起始 y 坐标 + :param dx: x 方向增量 + :param dy: y 方向增量 + :return: 连续棋子数 + """ + count = 0 + for step in range(WIN_CONDITION): + nx = x + dx * step + ny = y + dy * step + if 0 <= nx < BOARD_SIZE and 0 <= ny < BOARD_SIZE and self.board[nx, ny] == player: + count += 1 else: - block += 1 break - return count, block + return count - for i in range(BOARD_SIZE): - for j in range(BOARD_SIZE): - if self.board[i, j] != 0: - player = self.board[i, j] - for dx, dy in directions: - count, block = evaluate_line(player, i, j, dx, dy) - if count >= WIN_CONDITION: - score += 10000 - elif count == 4 and block == 0: - score += 500 - elif count == 4 and block == 1: - score += 100 - elif count == 3 and block == 0: - score += 50 - elif count == 3 and block == 1: - score += 10 - elif count == 2 and block == 0: - score += 5 - elif count == 2 and block == 1: - score += 1 + + directions = [(1, 0), (0, 1), (1, 1), (1, -1)] + score = 0 + for x in range(BOARD_SIZE): + for y in range(BOARD_SIZE): + player = self.board[x, y] + if player == 0: + continue + for dx, dy in directions: + count = count_consecutive(player, x, y, dx, dy) + if count == 5: # 五子连珠 + score += 10000 + elif count == 4: # 四子连珠 + score += 500 + elif count == 3: # 三子连珠 + score += 100 + elif count == 2: # 二子连珠 + score += 10 + return score + + def simulate_move(self, action): + x, y = action // BOARD_SIZE, action % BOARD_SIZE + if self.board[x, y]!= 0: + return False + self.board[x, y] = self.current_player + self.current_player = 3 - self.current_player + return True + + + def evaluate_state(self): + return self.evaluate_board() + + def print_board(self): for i in range(BOARD_SIZE): row = '' @@ -129,6 +139,7 @@ def print_board(self): print(row) print() + # Version #1 class GomokuNetV1(nn.Module): def __init__(self): @@ -137,12 +148,14 @@ def __init__(self): self.fc2 = nn.Linear(256, 256) self.fc3 = nn.Linear(256, BOARD_SIZE * BOARD_SIZE) + def forward(self, x): x = torch.relu(self.fc1(x)) x = torch.relu(self.fc2(x)) x = self.fc3(x) return x + # 卷积神经网络(CNN) class GomokuNetV2(nn.Module): def __init__(self): @@ -152,6 +165,7 @@ def __init__(self): self.fc1 = nn.Linear(128 * BOARD_SIZE * BOARD_SIZE, 256) self.fc2 = nn.Linear(256, BOARD_SIZE * BOARD_SIZE) + def forward(self, x): x = torch.relu(self.conv1(x.view(-1, 1, BOARD_SIZE, BOARD_SIZE))) x = torch.relu(self.conv2(x)) @@ -160,16 +174,19 @@ def forward(self, x): x = self.fc2(x) return x + def get_valid_action(logits, board, epsilon=0.1): - logits = logits.flatten() # 展平logits,确保其形状为(BOARD_SIZE * BOARD_SIZE,) + logits = logits.flatten() # 展平 logits,确保其形状为(BOARD_SIZE * BOARD_SIZE,) valid_actions = [(logits[i].item(), i) for i in range(BOARD_SIZE * BOARD_SIZE) if board[i // BOARD_SIZE, i % BOARD_SIZE] == 0] valid_actions.sort(reverse=True, key=lambda x: x[0]) # 根据 logits 从大到小排序 + if random.random() < epsilon: return random.choice(valid_actions)[1] if valid_actions else -1 else: return valid_actions[0][1] if valid_actions else -1 + def load_model_if_exists(model, file_path): if os.path.exists(file_path): model.load_state_dict(torch.load(file_path)) diff --git a/train.py b/train.py index 7e77a6f..0bf109f 100644 --- a/train.py +++ b/train.py @@ -1,79 +1,165 @@ +import random import torch import torch.nn as nn import torch.optim as optim from model import Gomoku, GomokuNetV2, get_valid_action, load_model_if_exists -NEED_PRINT_BOARD = False # 打印棋盘 -# 标志位,控制是否使用GPU -USE_GPU = torch.cuda.is_available() -print("USE_GPU:", USE_GPU) import random + def get_random_smaller_thousand_multiple(number): - if number <= 1000: - return "input number must > 1000。" - # 生成比输入数字小的、但不等于0的以1000为倍数的整数列表 - multiples = [i for i in range(1000, number, 1000)] - return random.choice(multiples) if multiples else "has no number" + """ + 生成比输入数字小的、但不等于 0 的以 1000 为倍数的整数列表,并根据线性分布随机选择一个。 + :param number: 输入的数字 + :return: 随机选择的倍数,如果不满足条件返回相应的信息 + """ + if number < 1000: + return "input number must >= 1000." + # 当 number 等于 1000 时,将 multiples 列表初始化为包含 1000 + if number == 1000: + multiples = [1000] + else: + multiples = [i for i in range(1000, number, 1000)] + # 计算每个元素的权重,这里使用线性分布,数字越大权重越大 + weights = [i for i in range(1, len(multiples) + 1)] + # 随机选择一个元素,根据计算出的权重 + return random.choices(multiples, weights=weights)[0] if multiples else "has no number" -def train(): - device = torch.device("cuda" if USE_GPU else "cpu") - env = Gomoku() + +def setup_device(): + """ + 检查是否使用 GPU 并设置设备 + :return: 设备对象 + """ + use_gpu = torch.cuda.is_available() + print("USE_GPU:", use_gpu) + return torch.device("cuda" if use_gpu else "cpu") + + +def setup_players_and_optimizers(device): + """ + 初始化玩家模型和优化器 + :param device: 计算设备 + :return: 玩家 1 的模型、玩家 2 的模型、玩家 1 的优化器、玩家 2 的优化器 + """ model1 = GomokuNetV2().to(device) - model2 = GomokuNetV2().to(device) # 作为陪练模型 + model2 = GomokuNetV2().to(device) optimizer1 = optim.Adam(model1.parameters()) optimizer2 = optim.Adam(model2.parameters()) - criterion = nn.CrossEntropyLoss() + return model1, model2, optimizer1, optimizer2 + + +def load_model_weights(model, optimizer, model_path): + """ + 加载模型权重并设置优化器 + :param model: 要加载权重的模型 + :param optimizer: 对应的优化器 + :param model_path: 权重文件路径 + """ + load_model_if_exists(model, model_path) + optimizer = optim.Adam(model.parameters()) + + +def select_action(env, model, optimizer, state, epsilon): + """ + 为玩家选择动作 + :param env: 游戏环境 + :param model: 玩家的模型 + :param optimizer: 玩家的优化器 + :param state: 当前状态 + :param epsilon: epsilon-greedy 策略中的 epsilon 值 + :return: 选择的动作 + """ + logits = model(state) + action = get_valid_action(logits.cpu().detach().numpy(), env.board, epsilon) + return logits, optimizer, action + - # 尝试加载模型权重 +def update_model(reward, logits, optimizer, action, env, criterion, device): + """ + 根据奖励更新模型参数 + :param reward: 获得的奖励 + :param logits: 模型的输出 + :param optimizer: 优化器 + :param action: 采取的动作 + :param env: 游戏环境 + :param criterion: 损失函数 + :param device: 设备 + """ + if reward!= 0: # 当奖励不为 0 时更新模型 + target = torch.LongTensor([action]).to(device) + # 改进:根据分数调整损失函数 + loss = criterion(logits.view(1, -1), target) * torch.FloatTensor([reward]).to(device) + optimizer.zero_grad() + loss.backward() + optimizer.step() + + +def print_game_result(env, round, reward, current_player): + """ + 打印游戏结果和最终棋盘状态 + :param env: 游戏环境 + :param round: 回合数 + :param reward: 奖励 + """ + if abs(reward) == 10000: # 五子连珠获胜或失败 + print(f"Round {round}, Player {current_player} win with 5 in a row!") + # elif abs(reward) == 1000: # 一般获胜或失败 + # print(f"Round {round}, Player {current_player} win!") + elif abs(reward) == 500: # 四子连珠获胜或失败 + print(f"\tRound {round}, Player {current_player} has 4 in a row!") + elif abs(reward) == 100: # 三子连珠获胜或失败 + print(f"\tRound {round}, Player {current_player} has 3 in a row!") + elif abs(reward) == 10: # 二子连珠获胜或失败 + print(f"\tRound {round}, Player {current_player} has 2 in a row!") + #env.print_board() + + +def train(): + """ + 主训练函数 + """ + device = setup_device() + env = Gomoku() + model1, model2, optimizer1, optimizer2 = setup_players_and_optimizers(device) load_model_if_exists(model1, 'gobang_best_model.pth') load_model_if_exists(model2, 'gobang_best_model.pth') + criterion1 = nn.CrossEntropyLoss() + criterion2 = nn.CrossEntropyLoss() + epsilon = 0.1 - epsilon = 0.1 # 设置Epsilon-Greedy策略中的epsilon值 - for round in range(100000): # 增加训练回合数 + for round in range(100000): env.reset() done = False - while not done: - state = torch.FloatTensor(env.board.flatten()).unsqueeze(0).to(device) # 增加batch维度 - + state = torch.FloatTensor(env.board.flatten()).unsqueeze(0).to(device) if env.current_player == 1: - logits = model1(state) - optimizer = optimizer1 - action = get_valid_action(logits.cpu().detach().numpy(), env.board, epsilon) + logits1, optimizer1, action = select_action(env, model1, optimizer1, state, epsilon) else: - logits = model2(state) - optimizer = optimizer2 - action = get_valid_action(logits.cpu().detach().numpy(), env.board, 0.45) # Player2 增加随机性 - + logits2, optimizer2, action = select_action(env, model2, optimizer2, state, 0.3) if action == -1: break - reward, done = env.step(action) + current_player, done, score = env.step(action) + if current_player == 1: + update_model(score, logits1, optimizer1, action, env, criterion1, device) + else: + update_model(score, logits2, optimizer2, action, env, criterion2, device) - if reward != -1: - target = torch.LongTensor([action]).to(device) - loss = criterion(logits.view(1, -1), target) - optimizer.zero_grad() - loss.backward() - optimizer.step() + print_game_result(env, round, score, current_player) - if done and reward != 0: - print(f"Round {round}, Player {reward} win!") - if NEED_PRINT_BOARD: - env.print_board() # 打印棋盘最终状态 - # 每一千个回合重置Player2 if (round + 1) % 1000 == 0: torch.save(model1.state_dict(), f'gobang_model_player1_{round + 1}.pth') - model2 = GomokuNetV2().to(device) # 重置Player2 + model2 = GomokuNetV2().to(device) optimizer2 = optim.Adam(model2.parameters()) random_pth_number = get_random_smaller_thousand_multiple(round + 1) - load_model_if_exists(model2, f'gobang_model_player1_{random_pth_number}.pth') + load_model_weights(model2, optimizer2, f'gobang_model_player1_{random_pth_number}.pth') + - # 保存最终的Player1模型 torch.save(model1.state_dict(), 'gobang_best_model.pth') + if __name__ == "__main__": train() diff --git a/val.py b/val.py index 7ab9587..e0e81c7 100644 --- a/val.py +++ b/val.py @@ -13,8 +13,8 @@ def validator(): model2 = GomokuNetV2().to(device) # 加载模型权重 - load_model_if_exists(model1, 'gobang_model_player1_12000.pth') - load_model_if_exists(model2, 'gobang_model_player1_9000.pth') + load_model_if_exists(model1, 'gobang_best_model.pth') + load_model_if_exists(model2, 'gobang_best_model.pth') player1_win_count = 0 player2_win_count = 0 @@ -28,26 +28,26 @@ def validator(): state = torch.FloatTensor(env.board.flatten()).unsqueeze(0).to(device) # 增加batch维度 if env.current_player == 1: logits = model1(state) - action = get_valid_action(logits.cpu().detach().numpy(), env.board, 0.01) + action = get_valid_action(logits.cpu().detach().numpy(), env.board, 0.0001) else: logits = model2(state) - action = get_valid_action(logits.cpu().detach().numpy(), env.board, 0.4) # Player2 增加随机性 + action = get_valid_action(logits.cpu().detach().numpy(), env.board, 0.1) # Player2 增加随机性 if action == -1: print("No valid actions available. Ending the game.") break - reward, done = env.step(action) + current_player, done, reward = env.step(action) #if NEED_PRINT_BOARD: # 打印中间状态 #env.print_board() if done: - if reward == 1: + if current_player == 1: player1_win_count += 1 - elif reward == 2: + elif current_player == 2: player2_win_count += 1 total_game_count = player1_win_count + player2_win_count player1_win_rate = (player1_win_count / total_game_count) * 100 if total_game_count > 0 else 0 player2_win_rate = (player2_win_count / total_game_count) * 100 if total_game_count > 0 else 0 - print(f"Validator Round {round},\tPlayer {reward} win!\tPlayer 1 win rate: {player1_win_rate:.2f}%, Player 2 win rate: {player2_win_rate:.2f}%") + print(f"Validator Round {round},\tPlayer {current_player} win!\tPlayer 1 win rate: {player1_win_rate:.2f}%, Player 2 win rate: {player2_win_rate:.2f}%") if NEED_PRINT_BOARD: env.print_board() break