Commit 51caa728 by Leo

upload code

parent d0fd94df
import gym
import gym
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# 定义DQN网络
class DQN(nn.Module):
def __init__(self, state_size, action_size):
super(DQN, self).__init__()
self.fc1 = nn.Linear(state_size, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 定义经验回放缓冲区
class ReplayBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.position = 0
def push(self, state, action, reward, next_state, done):
if len(self.buffer) < self.capacity:
self.buffer.append(None)
self.buffer[self.position] = (state, action, reward, next_state, done)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
batch = np.random.choice(self.buffer, batch_size, replace=False)
states, actions, rewards, next_states, dones = zip(*batch)
return states, actions, rewards, next_states, dones
def __len__(self):
return len(self.buffer)
# 定义DQN Agent
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = DQN(state_size, action_size).to(self.device)
self.target_model = DQN(state_size, action_size).to(self.device)
self.target_model.load_state_dict(self.model.state_dict())
self.target_model.eval()
self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
self.loss_fn = nn.MSELoss()
self.replay_buffer = ReplayBuffer(capacity=10000)
self.batch_size = 32
self.gamma = 0.99
self.epsilon = 1.0
self.epsilon_decay = 0.999
self.epsilon_min = 0.01
self.update_target_interval = 100
def select_action(self, state):
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size)
state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
q_values = self.model(state)
action = q_values.argmax().item()
return action
def train(self):
if len(self.replay_buffer) < self.batch_size:
return
states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
states = torch.FloatTensor(states).to(self.device)
actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
next_states = torch.FloatTensor(next_states).to(self.device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)
q_values = self.model(states).gather(1, actions)
next_q_values = self.target_model(next_states).max(1)[0].unsqueeze(1)
target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
loss = self.loss_fn(q_values, target_q_values.detach())
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def update_target_model(self):
self.target_model.load_state_dict(self.model.state_dict())
def update_epsilon(self):
self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)
# 创建DQN Agent
env = gym.make('SuperMarioBros-1-1-v0')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)
# 训练DQN Agent
num_episodes = 1000
scores = []
for episode in range(num_episodes):
state = env.reset()
score = 0
done = False
while not done:
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
agent.replay_buffer.push(state, action, reward, next_state, done)
state = next_state
score += reward
agent.train()
if episode % agent.update_target_interval == 0:
agent.update_target_model()
agent.update_epsilon()
scores.append(score)
if episode % 100 == 0:
print(f"Episode: {episode}, Average Score: {np.mean(scores[-100:])}")
# 绘制训练过程中的分数曲线
plt.plot(scores)
plt.xlabel('Episode')
plt.ylabel('Score')
plt.title('Training Progress')
plt.show()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment