Commit b762a8ce by 前钰

first commit

parents
# Auto detect text files and perform LF normalization
# Auto detect text files and perform LF normalization
* text=auto
# Default ignored files
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="pytest" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/DDPG-main.iml" filepath="$PROJECT_DIR$/.idea/DDPG-main.iml" />
</modules>
</component>
</project>
\ No newline at end of file
import torch as T
import torch as T
import torch.nn.functional as F
import numpy as np
from networks import ActorNetwork, CriticNetwork
from buffer import ReplayBuffer
device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
class DDPG:
def __init__(self, alpha, beta, state_dim, action_dim, actor_fc1_dim,
actor_fc2_dim, critic_fc1_dim, critic_fc2_dim, ckpt_dir,
gamma=0.99, tau=0.005, action_noise=0.1, max_size=1000000,
batch_size=256):
self.gamma = gamma
self.tau = tau
self.action_noise = action_noise
self.checkpoint_dir = ckpt_dir
self.actor = ActorNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
fc1_dim=actor_fc1_dim, fc2_dim=actor_fc2_dim)
self.target_actor = ActorNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
fc1_dim=actor_fc1_dim, fc2_dim=actor_fc2_dim)
self.critic = CriticNetwork(beta=beta, state_dim=state_dim, action_dim=action_dim,
fc1_dim=critic_fc1_dim, fc2_dim=critic_fc2_dim)
self.target_critic = CriticNetwork(beta=beta, state_dim=state_dim, action_dim=action_dim,
fc1_dim=critic_fc1_dim, fc2_dim=critic_fc2_dim)
self.memory = ReplayBuffer(max_size=max_size, state_dim=state_dim, action_dim=action_dim,
batch_size=batch_size)
self.update_network_parameters(tau=1.0)
def update_network_parameters(self, tau=None):
if tau is None:
tau = self.tau
for actor_params, target_actor_params in zip(self.actor.parameters(),
self.target_actor.parameters()):
target_actor_params.data.copy_(tau * actor_params + (1 - tau) * target_actor_params)
for critic_params, target_critic_params in zip(self.critic.parameters(),
self.target_critic.parameters()):
target_critic_params.data.copy_(tau * critic_params + (1 - tau) * target_critic_params)
def remember(self, state, action, reward, state_, done):
self.memory.store_transition(state, action, reward, state_, done)
def choose_action(self, observation, train=True):
self.actor.eval()
state = T.tensor([observation], dtype=T.float).to(device)
action = self.actor.forward(state).squeeze()
if train:
noise = T.tensor(np.random.normal(loc=0.0, scale=self.action_noise),
dtype=T.float).to(device)
action = T.clamp(action+noise, -1, 1)
self.actor.train()
return action.detach().cpu().numpy()
def learn(self):
if not self.memory.ready():
return
states, actions, reward, states_, terminals = self.memory.sample_buffer()
states_tensor = T.tensor(states, dtype=T.float).to(device)
actions_tensor = T.tensor(actions, dtype=T.float).to(device)
rewards_tensor = T.tensor(reward, dtype=T.float).to(device)
next_states_tensor = T.tensor(states_, dtype=T.float).to(device)
terminals_tensor = T.tensor(terminals).to(device)
with T.no_grad():
next_actions_tensor = self.target_actor.forward(next_states_tensor)
q_ = self.target_critic.forward(next_states_tensor, next_actions_tensor).view(-1)
q_[terminals_tensor] = 0.0
target = rewards_tensor + self.gamma * q_
q = self.critic.forward(states_tensor, actions_tensor).view(-1)
critic_loss = F.mse_loss(q, target.detach())
self.critic.optimizer.zero_grad()
critic_loss.backward()
self.critic.optimizer.step()
new_actions_tensor = self.actor.forward(states_tensor)
actor_loss = -T.mean(self.critic(states_tensor, new_actions_tensor))
self.actor.optimizer.zero_grad()
actor_loss.backward()
self.actor.optimizer.step()
self.update_network_parameters()
def save_models(self, episode):
self.actor.save_checkpoint(self.checkpoint_dir + 'Actor/DDPG_actor_{}.pth'.format(episode))
print('Saving actor network successfully!')
self.target_actor.save_checkpoint(self.checkpoint_dir +
'Target_actor/DDPG_target_actor_{}.pth'.format(episode))
print('Saving target_actor network successfully!')
self.critic.save_checkpoint(self.checkpoint_dir + 'Critic/DDPG_critic_{}'.format(episode))
print('Saving critic network successfully!')
self.target_critic.save_checkpoint(self.checkpoint_dir +
'Target_critic/DDPG_target_critic_{}'.format(episode))
print('Saving target critic network successfully!')
def load_models(self, episode):
self.actor.load_checkpoint(self.checkpoint_dir + 'Actor/DDPG_actor_{}.pth'.format(episode))
print('Loading actor network successfully!')
self.target_actor.load_checkpoint(self.checkpoint_dir +
'Target_actor/DDPG_target_actor_{}.pth'.format(episode))
print('Loading target_actor network successfully!')
self.critic.load_checkpoint(self.checkpoint_dir + 'Critic/DDPG_critic_{}'.format(episode))
print('Loading critic network successfully!')
self.target_critic.load_checkpoint(self.checkpoint_dir +
'Target_critic/DDPG_target_critic_{}'.format(episode))
print('Loading target critic network successfully!')
MIT License
MIT License
Copyright (c) 2022 indigoLovee
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# DDPG
# DDPG
DDPG in Pytorch
# 仿真环境
gym中的LunarLanderContinuous-v2
# 环境依赖
* gym
* numpy
* matplotlib
* python3.6
* pytorch1.6
# 文件描述
* train.py为训练脚本,配置好环境后直接运行即可,不过需要在当前目录下创建output_images文件夹,用于放置生成的仿真结果;
* network.py为网络脚本,包括演员网络和评论家网络;
* buffer.py为经验回放池脚本;
* DDPG.py为DDPG算法的实现脚本;
* utils.py为工具箱脚本,里面主要放置一些通过函数;
* test.py为测试脚本,通过加载训练好的权重在环境中进行测试,测试训练效果。
# 仿真结果
详见output_images文件夹
import numpy as np
import numpy as np
class ReplayBuffer:
def __init__(self, max_size, state_dim, action_dim, batch_size):
self.mem_size = max_size
self.batch_size = batch_size
self.mem_cnt = 0
self.state_memory = np.zeros((self.mem_size, state_dim))
self.action_memory = np.zeros((self.mem_size, action_dim))
self.reward_memory = np.zeros((self.mem_size, ))
self.next_state_memory = np.zeros((self.mem_size, state_dim))
self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)
def store_transition(self, state, action, reward, state_, done):
mem_idx = self.mem_cnt % self.mem_size
self.state_memory[mem_idx] = state
self.action_memory[mem_idx] = action
self.reward_memory[mem_idx] = reward
self.next_state_memory[mem_idx] = state_
self.terminal_memory[mem_idx] = done
self.mem_cnt += 1
def sample_buffer(self):
mem_len = min(self.mem_size, self.mem_cnt)
batch = np.random.choice(mem_len, self.batch_size, replace=False)
states = self.state_memory[batch]
actions = self.action_memory[batch]
rewards = self.reward_memory[batch]
states_ = self.next_state_memory[batch]
terminals = self.terminal_memory[batch]
return states, actions, rewards, states_, terminals
def ready(self):
return self.mem_cnt >= self.batch_size
import torch as T
import torch as T
import torch.nn as nn
import torch.optim as optim
device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
def weight_init(m):
if isinstance(m, nn.Linear):
nn.init.xavier_normal_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0.0)
elif isinstance(m, nn.BatchNorm1d):
nn.init.constant_(m.weight, 1.0)
nn.init.constant_(m.bias, 0.0)
class ActorNetwork(nn.Module):
def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
super(ActorNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, fc1_dim)
self.ln1 = nn.LayerNorm(fc1_dim)
self.fc2 = nn.Linear(fc1_dim, fc2_dim)
self.ln2 = nn.LayerNorm(fc2_dim)
self.action = nn.Linear(fc2_dim, action_dim)
self.optimizer = optim.Adam(self.parameters(), lr=alpha)
self.apply(weight_init)
self.to(device)
def forward(self, state):
x = T.relu(self.ln1(self.fc1(state)))
x = T.relu(self.ln2(self.fc2(x)))
action = T.tanh(self.action(x))
return action
def save_checkpoint(self, checkpoint_file):
T.save(self.state_dict(), checkpoint_file)
def load_checkpoint(self, checkpoint_file):
self.load_state_dict(T.load(checkpoint_file))
class CriticNetwork(nn.Module):
def __init__(self, beta, state_dim, action_dim, fc1_dim, fc2_dim):
super(CriticNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, fc1_dim)
self.ln1 = nn.LayerNorm(fc1_dim)
self.fc2 = nn.Linear(fc1_dim, fc2_dim)
self.ln2 = nn.LayerNorm(fc2_dim)
self.fc3 = nn.Linear(action_dim, fc2_dim)
self.q = nn.Linear(fc2_dim, 1)
self.optimizer = optim.Adam(self.parameters(), lr=beta, weight_decay=0.001)
self.apply(weight_init)
self.to(device)
def forward(self, state, action):
x_s = T.relu(self.ln1(self.fc1(state)))
x_s = self.ln2(self.fc2(x_s))
x_a = self.fc3(action)
x = T.relu(x_s + x_a)
q = self.q(x)
return q
def save_checkpoint(self, checkpoint_file):
T.save(self.state_dict(), checkpoint_file)
def load_checkpoint(self, checkpoint_file):
self.load_state_dict(T.load(checkpoint_file))
import gym
import gym
import imageio
import argparse
from DDPG import DDPG
from utils import scale_action
parser = argparse.ArgumentParser()
parser.add_argument('--filename', type=str, default='./output_images/LunarLander.gif')
parser.add_argument('--checkpoint_dir', type=str, default='./checkpoints/DDPG/')
parser.add_argument('--save_video', type=bool, default=True)
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--render', type=bool, default=True)
args = parser.parse_args()
def main():
env = gym.make('LunarLanderContinuous-v2')
agent = DDPG(alpha=0.0003, beta=0.0003, state_dim=env.observation_space.shape[0],
action_dim=env.action_space.shape[0], actor_fc1_dim=400, actor_fc2_dim=300,
critic_fc1_dim=400, critic_fc2_dim=300, ckpt_dir=args.checkpoint_dir,
batch_size=256)
agent.load_models(1000)
video = imageio.get_writer(args.filename, fps=args.fps)
done = False
observation = env.reset()
while not done:
if args.render:
env.render()
action = agent.choose_action(observation, train=True)
action_ = scale_action(action.copy(), env.action_space.high, env.action_space.low)
observation_, reward, done, info = env.step(action_)
observation = observation_
if args.save_video:
video.append_data(env.render(mode='rgb_array'))
if __name__ == '__main__':
main()
import gym
import gym
import numpy as np
import argparse
from DDPG import DDPG
from utils import create_directory, plot_learning_curve, scale_action
parser = argparse.ArgumentParser("DDPG parameters")
parser.add_argument('--max_episodes', type=int, default=1000)
parser.add_argument('--checkpoint_dir', type=str, default='./checkpoints/DDPG/')
parser.add_argument('--figure_file', type=str, default='./output_images/reward.png')
args = parser.parse_args()
def main():
env = gym.make('LunarLanderContinuous-v2')
agent = DDPG(alpha=0.0003, beta=0.0003, state_dim=env.observation_space.shape[0],
action_dim=env.action_space.shape[0], actor_fc1_dim=400, actor_fc2_dim=300,
critic_fc1_dim=400, critic_fc2_dim=300, ckpt_dir=args.checkpoint_dir,
batch_size=256)
create_directory(args.checkpoint_dir,
sub_paths=['Actor', 'Target_actor', 'Critic', 'Target_critic'])
reward_history = []
avg_reward_history = []
for episode in range(args.max_episodes):
done = False
total_reward = 0
observation = env.reset()
while not done:
action = agent.choose_action(observation, train=True)
action_ = scale_action(action.copy(), env.action_space.high, env.action_space.low)
observation_, reward, done, info = env.step(action_)
agent.remember(observation, action, reward, observation_, done)
agent.learn()
total_reward += reward
observation = observation_
reward_history.append(total_reward)
avg_reward = np.mean(reward_history[-100:])
avg_reward_history.append(avg_reward)
print('Ep: {} Reward: {:.1f} AvgReward: {:.1f}'.format(episode+1, total_reward, avg_reward))
if (episode + 1) % 200 == 0:
agent.save_models(episode+1)
episodes = [i+1 for i in range(args.max_episodes)]
plot_learning_curve(episodes, avg_reward_history, title='AvgReward',
ylabel='reward', figure_file=args.figure_file)
if __name__ == '__main__':
main()
\ No newline at end of file
import os
import os
import numpy as np
import matplotlib.pyplot as plt
class OUActionNoise:
def __init__(self, mu, sigma=0.15, theta=0.2, dt=1e-2, x0=None):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.reset()
def __call__(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
self.x_prev = x
return x
def reset(self):
self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)
def create_directory(path: str, sub_paths: list):
for sub_path in sub_paths:
if not os.path.exists(path + sub_path):
os.makedirs(path + sub_path, exist_ok=True)
print('Create path: {} successfully'.format(path+sub_path))
else:
print('Path: {} is already existence'.format(path+sub_path))
def plot_learning_curve(episodes, records, title, ylabel, figure_file):
plt.figure()
plt.plot(episodes, records, color='r', linestyle='-')
plt.title(title)
plt.xlabel('episode')
plt.ylabel(ylabel)
plt.show()
plt.savefig(figure_file)
def scale_action(action, high, low):
action = np.clip(action, -1, 1)
weight = (high - low) / 2
bias = (high + low) / 2
action_ = action * weight + bias
return action_
import numpy as np
import numpy as np
import gym
import time
import turtle
assert gym.__version__ == "0.18.0", "[Version WARNING] please try `pip install gym==0.18.0`"
class FrozenLakeWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.max_y = env.desc.shape[0]
self.max_x = env.desc.shape[1]
self.t = None
self.unit = 50
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for _ in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for i in range(self.desc.shape[0]):
for j in range(self.desc.shape[1]):
x = j
y = self.max_y - 1 - i
if self.desc[i][j] == b'S': # Start
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'F': # Frozen ice
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'G': # Goal
self.draw_box(x, y, 'yellow')
elif self.desc[i][j] == b'H': # Hole
self.draw_box(x, y, 'black')
else:
self.draw_box(x, y, 'white')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
class CliffWalkingWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.t = None
self.unit = 50
self.max_x = 12
self.max_y = 4
def draw_x_line(self, y, x0, x1, color='gray'):
assert x1 > x0
self.t.color(color)
self.t.setheading(0)
self.t.up()
self.t.goto(x0, y)
self.t.down()
self.t.forward(x1 - x0)
def draw_y_line(self, x, y0, y1, color='gray'):
assert y1 > y0
self.t.color(color)
self.t.setheading(90)
self.t.up()
self.t.goto(x, y0)
self.t.down()
self.t.forward(y1 - y0)
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for i in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for _ in range(2):
self.t.forward(self.max_x * self.unit)
self.t.left(90)
self.t.forward(self.max_y * self.unit)
self.t.left(90)
for i in range(1, self.max_y):
self.draw_x_line(
y=i * self.unit, x0=0, x1=self.max_x * self.unit)
for i in range(1, self.max_x):
self.draw_y_line(
x=i * self.unit, y0=0, y1=self.max_y * self.unit)
for i in range(1, self.max_x - 1):
self.draw_box(i, 0, 'black')
self.draw_box(self.max_x - 1, 0, 'yellow')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
class QLearningAgent(object):
def __init__(self,
obs_n,
act_n,
learning_rate=0.01,
gamma=0.9,
e_greed=0.1):
self.act_n = act_n # 动作维度,有几个动作可选
self.lr = learning_rate # 学习率
self.gamma = gamma # reward的衰减率
self.epsilon = e_greed # 按一定概率随机选动作
self.Q = np.zeros((obs_n, act_n))
# 根据输入观察值,采样输出的动作值,带探索
def sample(self, obs):
if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作
action = self.predict(obs)
else:
action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作
return action
# 根据输入观察值,预测输出的动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ = np.max(Q_list)
action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个action
action = np.random.choice(action_list)
return action
# 学习方法,也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, done):
""" off-policy
obs: 交互前的obs, s_t
action: 本次交互选择的action, a_t
reward: 本次动作获得的奖励r
next_obs: 本次交互后的obs, s_t+1
done: episode是否结束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 没有下一个状态了
else:
target_Q = reward + self.gamma * np.max(self.Q[next_obs, :]) # Q-learning
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q
# 把 Q表格 的数据保存到文件中
def save(self):
npy_file = './q_table.npy'
np.save(npy_file, self.Q)
print(npy_file + ' saved.')
# 从文件中读取数据到 Q表格
def restore(self, npy_file='./q_table.npy'):
self.Q = np.load(npy_file)
print(npy_file + ' loaded.')
def run_episode(env, agent, render=False):
total_steps = 0 # 记录每个episode走了多少step
total_reward = 0
obs = env.reset() # 重置环境, 重新开一局(即开始新的一个episode)
while True:
action = agent.sample(obs) # 根据算法选择一个动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
# 训练 Q-learning算法
agent.learn(obs, action, reward, next_obs, done)
obs = next_obs # 存储上一个观察值
total_reward += reward
total_steps += 1 # 计算step数
if render:
env.render() #渲染新的一帧图形
if done:
break
return total_reward, total_steps
def test_episode(env, agent):
total_reward = 0
obs = env.reset()
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done, _ = env.step(action)
total_reward += reward
obs = next_obs
time.sleep(0.5)
env.render()
if done:
print('test reward = %.1f' % (total_reward))
break
def main():
# env = gym.make("FrozenLake-v0", is_slippery=False) # 0 left, 1 down, 2 right, 3 up
# env = FrozenLakeWapper(env)
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffWalkingWapper(env)
agent = QLearningAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1)
is_render = False
for episode in range(500):
ep_reward, ep_steps = run_episode(env, agent, is_render)
print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps,
ep_reward))
# 每隔20个episode渲染一下看看效果
if episode % 20 == 0:
is_render = True
else:
is_render = False
# 训练结束,查看算法效果
test_episode(env, agent)
if __name__ == "__main__":
main()
\ No newline at end of file
import paddle
import paddle
import paddle.nn as nn
import paddle.vision.transforms as T
import numpy as np
import random, datetime, os, copy
# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack
# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace
# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
import paddle.nn.functional as F
import collections
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0")
# Limit the action-space to
# 0. walk right
# 1. jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])
env.reset()
next_state, reward, done, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")
class SkipFrame(gym.Wrapper):
def __init__(self, env, skip):
"""Return only every `skip`-th frame"""
super().__init__(env)
self._skip = skip
def step(self, action):
"""Repeat action, and sum reward"""
total_reward = 0.0
done = False
for i in range(self._skip):
# Accumulate reward and repeat the same action
obs, reward, done, info = self.env.step(action)
total_reward += reward
if done:
break
return obs, total_reward, done, info
class GrayScaleObservation(gym.ObservationWrapper):
def __init__(self, env):
super().__init__(env)
obs_shape = self.observation_space.shape[:2]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def observation(self, observation):
transform = T.Grayscale()
observation = transform(observation)
observation = np.transpose(observation, (2, 0, 1)).squeeze(0)
# observation = paddle.to_tensor(observation.copy(), dtype="float32")
return observation
class ResizeObservation(gym.ObservationWrapper):
def __init__(self, env, shape):
super().__init__(env)
if isinstance(shape, int):
self.shape = (shape, shape)
else:
self.shape = tuple(shape)
obs_shape = self.shape + self.observation_space.shape[2:]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def observation(self, observation):
transforms = T.Compose(
[T.Resize(self.shape), T.Normalize(0, 255, data_format='HWC')]
# [T.Resize(self.shape), T.Normalize(0, 255)] T.Normalize(mean=0, std=255, data_format='HWC')
)
observation = transforms(observation)
return observation
# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = ResizeObservation(env, shape=84)
env = GrayScaleObservation(env)
env = FrameStack(env, num_stack=4)
env.reset()
next_state, reward, done, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")
print(next_state)
class Model(nn.Layer):
def __init__(self, num_inputs, num_actions):
super(Model, self).__init__()
self.conv1 = nn.Conv2D(num_inputs, 32, 8, stride=4)
self.conv2 = nn.Conv2D(32, 64, 4, stride=2)
self.conv3 = nn.Conv2D(64, 64, 3, stride=1)
self.flatten = nn.Flatten()
self.linear = nn.Linear(3136, 512)
self.fc = nn.Linear(512, num_actions)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
x = self.flatten(x)
x = self.linear(x)
return self.fc(x)
class ReplayMemory(object):
def __init__(self, max_size):
self.buffer = collections.deque(maxlen=max_size)
def append(self, exp):
self.buffer.append(exp)
def sample(self, batch_size):
mini_batch = random.sample(self.buffer, batch_size)
batch_obs, batch_action, batch_reword, batch_next_obs, batch_done = [], [], [], [], []
for experience in mini_batch:
s, a, r, s_p, isOver = experience
batch_obs.append(s)
batch_action.append(a)
batch_reword.append(r)
batch_next_obs.append(s_p)
batch_done.append(isOver)
batch_obs = paddle.to_tensor(batch_obs, dtype='float32')
batch_action = paddle.to_tensor(batch_action, dtype='int64')
batch_reword = paddle.to_tensor(batch_reword, dtype='float32')
batch_next_obs = paddle.to_tensor(batch_next_obs, dtype='float32')
batch_done = paddle.to_tensor(batch_done, dtype='int64')
return batch_obs, batch_action, batch_reword, batch_next_obs, batch_done
def __len__(self):
return len(self.buffer)
# 定义训练的参数
batch_size = 32 # batch大小
num_episodes = 10000 # 训练次数
memory_size = 20000 # 内存记忆
learning_rate = 1e-4 # 学习率大小
e_greed = 0.1 # 探索初始概率
gamma = 0.99 # 奖励系数
e_greed_decrement = 1e-6 # 在训练过程中,降低探索的概率
update_num = 0 # 用于计算目标模型更新次数
obs_shape = (4, 84, 84) # 观测图像的维度
save_model_path = "models/model(1-1).pdparams" # 保存模型路径
obs_dim = obs_shape[0]
action_dim = env.action_space.n
policyQ = Model(obs_dim, action_dim)
targetQ = Model(obs_dim, action_dim)
targetQ.eval()
if os.path.exists(save_model_path):
model_state_dict = paddle.load(save_model_path)
policyQ.set_state_dict(model_state_dict )
print('policyQ Model loaded')
targetQ.set_state_dict(model_state_dict )
print('targetQ Model loaded')
rpm = ReplayMemory(memory_size)
optimizer = paddle.optimizer.Adam(parameters=policyQ.parameters(),
learning_rate=learning_rate)
# 评估模型
def evaluate():
total_reward = 0
obs = env.reset()
while True:
obs = np.expand_dims(obs, axis=0)
obs = paddle.to_tensor(obs, dtype='float32')
action = targetQ(obs)
action = paddle.argmax(action).numpy()[0]
next_obs, reward, done, info = env.step(action)
obs = next_obs
total_reward += reward
if done:
break
return total_reward
def soft_update(target, source, tau):
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.set_value( target_param * (1.0 - tau) + param * tau)
# 训练模型
def train():
global e_greed, update_num
total_reward = 0
# 重置游戏状态
obs = env.reset()
while True:
# 使用贪心策略获取游戏动作的来源
e_greed = max(0.01, e_greed - e_greed_decrement)
if np.random.rand() < e_greed:
# 随机生成动作
action = np.random.randint(action_dim)
else:
# 策略模型预测游戏动作
obs1 = np.expand_dims(obs, axis=0)
action = policyQ(paddle.to_tensor(obs1, dtype='float32'))
action = paddle.argmax(action).numpy()[0]
# 执行游戏
next_obs, reward, done, info = env.step(action)
env.render()
total_reward += reward
# 记录游戏数据
rpm.append((obs, action, reward, next_obs, done))
obs = next_obs
# 游戏结束l
if done:
break
# 记录的数据打印batch_size就开始训练
if len(rpm) > batch_size:
# 获取训练数据
batch_obs, batch_action, batch_reword, batch_next_obs, batch_done = rpm.sample(batch_size)
# 计算损失函数
action_value = policyQ(batch_obs)
action_onehot = paddle.nn.functional.one_hot(batch_action, action_dim)
pred_action_value = paddle.sum(action_value * action_onehot, axis=1)
batch_argmax_action = paddle.argmax(policyQ(batch_next_obs), axis=1)
v = targetQ(batch_next_obs)
select_v = []
for i in range(v.shape[0]):
select_v.append(v[i][int(batch_argmax_action[i].numpy()[0])])
select_v = paddle.stack(select_v).squeeze()
select_v.stop_gradient = True
target = batch_reword + gamma * select_v * (1.0 - batch_done)
cost = paddle.nn.functional.mse_loss(pred_action_value, target)
# 梯度更新
cost.backward()
optimizer.step()
optimizer.clear_grad()
# 指定的训练次数更新一次目标模型的参数
# if update_num % 200 == 0:
# targetQ.load_dict(policyQ.state_dict())
# 软更新目标模型的参数
soft_update(targetQ, policyQ, tau = 0.001)
update_num += 1
return total_reward
if __name__ == '__main__':
episode = 0
while episode < num_episodes:
for t in range(3):
train_reward = train()
episode += 1
print('Episode: {}, Reward: {:.2f}, e_greed: {:.2f}'.format(episode, train_reward, e_greed))
if episode % 3 == 0:
eval_reward = evaluate()
print('Episode:{} test_reward:{}'.format(episode, eval_reward))
if eval_reward > 2500:
paddle.save(targetQ.state_dict(), 'models/model(1-1)_test_{:.2f}.pdparams'.format(eval_reward))
# 保存模型
if not os.path.exists(os.path.dirname(save_model_path)):
os.makedirs(os.path.dirname(save_model_path))
paddle.save(targetQ.state_dict(), save_model_path)
\ No newline at end of file
import gym
import gym
import numpy as np
import torch
import matplotlib.pyplot as plt
l1 = 4 # 输入数据长度为4
l2 = 150 # 隐藏层为150
l3 = 2 # 输出是一个用于向左向右动作长度为2的向量
env = gym.make("CartPole-v0")
model = torch.nn.Sequential(
torch.nn.Linear(l1, l2),
torch.nn.LeakyReLU(), # leakyReLU的意思是,不用relu激活,可以自己去掉试一试,效果会变差。
torch.nn.Linear(l2, l3),
torch.nn.Softmax(dim=0) # 输出是一个动作的softmax概率分布
)
learning_rate = 0.009
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# state1 = env.reset()
# pred = model(torch.from_numpy(state1).float()) # 调用策略网络模型产生预测的动作概率
# action = np.random.choice(np.array([0, 1]), p=pred.data.numpy()) # 从策略网络产生的概率分布中抽样一个动作
# state2, reward, done, info = env.step(action) # 采取动作并获得新的状态和奖励。info变量由环境产生,但与环境无关
def discount_rewards(rewards, gamma=0.99):
lenr = len(rewards)
disc_return = torch.pow(gamma, torch.arange(lenr).float()) * rewards # 计算指数衰减奖励
disc_return /= disc_return.max() # 讲奖励归一化到[0,1]以提高数值稳定性
return disc_return
def loss_fn(preds, r): # 损失函数期望一个对所采取动作的动作概率数组和贴现奖励
return -1 * torch.sum(r * torch.log(preds)) # 用于计算概率的对数,乘损失奖励,对其求和,然后对结果取反
MAX_DUR = 200
MAX_EPISODES = 500
gamma = 0.99
score = [] # 记录训练期间轮次长度的列表
expectation = 0.0
for episode in range(MAX_EPISODES):
curr_state = env.reset()
# env.render()
done = False
transitions = [] # 一系列状态,动作,奖励(但是我们忽略奖励)
for t in range(MAX_DUR):
act_prob = model(torch.from_numpy(curr_state).float()) # 获取动作概率
action = np.random.choice(np.array([0, 1]), p=act_prob.data.numpy()) # 随机选取一个动作
prev_state = curr_state
curr_state, _, done, info = env.step(action) # 在环境中采取动作
transitions.append((prev_state, action, t + 1)) # 存储这个转换
if done: # 游戏失败则退出循环
break
ep_len = len(transitions)
score.append(ep_len) # 存储轮次时长
print(ep_len)
reward_batch = torch.Tensor([r for (s, a, r) in transitions]).flip(dims=(0,)) # 在单个张量中收集轮次的所有奖励
disc_returns = discount_rewards(reward_batch) # 计算衰减奖励
state_batch = torch.Tensor([s for (s, a, r) in transitions]) # 在单个张量中收集轮次中的状态
action_batch = torch.Tensor([a for (s, a, r) in transitions]) # 在单个张量中收集轮次中的动作
pred_batch = model(state_batch) # 重新计算轮次中所有状态的动作概率
prob_batch = pred_batch.gather(dim=1, index=action_batch.long().view(-1, 1)).squeeze() # 取与实际采取动作关联的动作概率的子集
loss = loss_fn(prob_batch, disc_returns)
optimizer.zero_grad()
loss.backward()
optimizer.step()
env.render()
score = np.array(score)
# avg_score = running_mean(score, 50)
plt.figure(figsize=(10, 7))
plt.ylabel("Episode Duration", fontsize=22)
plt.xlabel("Training Epochs", fontsize=22)
plt.plot(score, color='green')
plt.show()
\ No newline at end of file
import gym
import gym
import time
# assert gym.__version__ == "0.18.0", "[Version WARNING] please try `pip install gym==0.18.0`"
import turtle
import numpy as np
# agent
class SarsaAgent(object):
def __init__(self,
obs_n, # state environment的观察值observation
act_n,
learning_rate=0.01,
gamma=0.9,
e_greed=0.1):
self.act_n = act_n # action 动作维度,有几个动作可选
self.Q = np.zeros((obs_n, act_n)) # reward Q表格的建立,一开始Q表格全为0
self.lr = learning_rate # 超参aerfa 学习率
self.gamma = gamma # 超参gamma reward的衰减率
self.epsilon = e_greed # 按一定概率随机选动作
# 根据输入观察值,采样输出的动作值,带探索即训练、试错在这里
def sample(self, obs):
# 如果随机值小于 1-epslion 则按照经验来走,反之则走else里面的random进行试错
if np.random.uniform(0, 1) < (1.0 - self.epsilon): # 根据table的Q值选动作
action = self.predict(obs) # 按照经验走,探索动作
else:
action = np.random.choice(self.act_n) # 有一定概率随机探索选取一个动作
return action
# 根据输入观察值,预测输出的动作值,无探索
def predict(self, obs):
Q_list = self.Q[obs, :] # 把状态的那一行先提取出来
maxQ = np.max(Q_list) # 贪心算法,”一行“ 里面找到最大的Q对应的下标,也就是他的action
action_list = np.where(Q_list == maxQ)[0]
action = np.random.choice(action_list) # maxQ可能对应多个action,此时就随机挑选一个
return action
# 强化的核心,学习方法,也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, next_action, done):
""" on-policy
obs: 交互前的obs, s_t
action: 本次交互选择的action, a_t
reward: 本次动作获得的奖励r
next_obs: 本次交互后的obs, s_t+1
next_action: 根据当前Q表格, 针对next_obs会选择的动作, a_t+1
done: episode是否结束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 没有下一个状态了
else:
target_Q = reward + self.gamma * self.Q[next_obs, next_action] # Sarsa
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q lr为学习率也就是超参之一的阿尔法
def save(self):
npy_file = './q_table.npy'
np.save(npy_file, self.Q)
print(npy_file + ' saved.')
def restore(self, npy_file='./q_table.npy'):
self.Q = np.load(npy_file)
print(npy_file + ' loaded.')
# 环境可视化
class FrozenLakeWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.max_y = env.desc.shape[0]
self.max_x = env.desc.shape[1]
self.t = None
self.unit = 50
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for _ in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for i in range(self.desc.shape[0]):
for j in range(self.desc.shape[1]):
x = j
y = self.max_y - 1 - i
if self.desc[i][j] == b'S': # Start
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'F': # Frozen ice
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'G': # Goal
self.draw_box(x, y, 'yellow')
elif self.desc[i][j] == b'H': # Hole
self.draw_box(x, y, 'black')
else:
self.draw_box(x, y, 'white')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
# 环境可视化
class CliffWalkingWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.t = None
self.unit = 50
self.max_x = 12
self.max_y = 4
def draw_x_line(self, y, x0, x1, color='gray'):
assert x1 > x0
self.t.color(color)
self.t.setheading(0)
self.t.up()
self.t.goto(x0, y)
self.t.down()
self.t.forward(x1 - x0)
def draw_y_line(self, x, y0, y1, color='gray'):
assert y1 > y0
self.t.color(color)
self.t.setheading(90)
self.t.up()
self.t.goto(x, y0)
self.t.down()
self.t.forward(y1 - y0)
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for i in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
# 可视化的实现接口
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for _ in range(2):
self.t.forward(self.max_x * self.unit)
self.t.left(90)
self.t.forward(self.max_y * self.unit)
self.t.left(90)
for i in range(1, self.max_y):
self.draw_x_line(
y=i * self.unit, x0=0, x1=self.max_x * self.unit)
for i in range(1, self.max_x):
self.draw_y_line(
x=i * self.unit, y0=0, y1=self.max_y * self.unit)
for i in range(1, self.max_x - 1):
self.draw_box(i, 0, 'black')
self.draw_box(self.max_x - 1, 0, 'yellow')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
def run_episode(env, agent, render=False):
total_steps = 0 # 记录每个episode走了多少step
total_reward = 0
obs = env.reset() # 重置环境, 重新开一局(即开始新的一个episode)
action = agent.sample(obs) # 根据算法选择一个动作
while True:
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
next_action = agent.sample(next_obs) # 根据算法选择一个动作
# 训练 Sarsa 算法
agent.learn(obs, action, reward, next_obs, next_action, done)
action = next_action
obs = next_obs # 存储上一个观察值
total_reward += reward
total_steps += 1 # 计算step数
if render:
env.render() # 渲染新的一帧图形
if done:
break
return total_reward, total_steps
def test_episode(env, agent):
total_reward = 0
obs = env.reset()
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done, _ = env.step(action)
total_reward += reward
obs = next_obs
time.sleep(0.5)
env.render()
if done:
print('test reward = %.1f' % (total_reward))
break
def main(): # 启动函数
# env = gym.make("FrozenLake-v0", is_slippery=False) # 0 left, 1 down, 2 right, 3 up
# env = FrozenLakeWapper(env)
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffWalkingWapper(env)
agent = SarsaAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1)
is_render = False
for episode in range(500):
ep_reward, ep_steps = run_episode(env, agent, is_render)
print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps, ep_reward))
# 每隔20个episode渲染一下看看效果
if episode % 20 == 0:
is_render = True
else:
is_render = False
# 训练结束,查看算法效果
test_episode(env, agent)
if __name__ == "__main__":
main()
\ No newline at end of file
import pandas as pd
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from matplotlib import pyplot as plt
class Environment:
def __init__(self, data, initial_oi, max_steps):
self.data = data
self.initial_oi = initial_oi
self.max_steps = max_steps
self.reset()
def reset(self):
self.step = 0
self.position = self.initial_oi
self.profit = 0
self.done = False
def get_state(self):
return np.array([
self.data['open'][self.step],
self.data['high'][self.step],
self.data['low'][self.step],
self.data['close'][self.step],
self.data['volume'][self.step],
self.data['open_oi'][self.step],
self.data['close_oi'][self.step],
self.position
])
def take_action(self, action):
if action == 0: # Buy
self.position += int(0.2 * self.position)
elif action == 1: # Sell
self.position -= int(0.2 * self.position)
self.profit += self.position * (self.data['close'][self.step + 1] - self.data['open'][self.step])
self.step += 1
if self.step >= self.max_steps:
self.done = True
def get_reward(self):
return self.profit
def is_done(self):
return self.done
class DQN(nn.Module):
def __init__(self, state_size, action_size):
super(DQN, self).__init__()
self.fc1 = nn.Linear(state_size, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, action_size)
def forward(self, x):
# 改进,尝试加入残差模块
out1 = F.relu(self.fc1(x))
out2 = F.relu(self.fc2(out1))
out2 += out1
x = self.fc3(out2)
return x
class Agent:
def __init__(self, state_size, action_size, learning_rate, gamma, epsilon):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.gamma = gamma
self.epsilon = epsilon
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Q-Network
self.q_network = DQN(state_size, action_size).to(self.device)
self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
def get_action(self, state):
state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
with torch.no_grad():
q_values = self.q_network(state)
if np.random.rand() <= self.epsilon:
action = np.random.choice(self.action_size)
else:
action = torch.argmax(q_values).item()
return action
def update_model(self, state, action, reward, next_state, done): # on-policy offpolicy 传入s,a,r,next_s
state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
next_state = torch.from_numpy(next_state).float().unsqueeze(0).to(self.device)
action = torch.tensor(action).long().unsqueeze(0).to(self.device)
reward = torch.tensor([reward], dtype=torch.float).unsqueeze(0).to(self.device)
done = torch.tensor([done], dtype=torch.float).unsqueeze(0).to(self.device)
q_values = self.q_network(state)
next_q_values = self.q_network(next_state)
q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
next_q_value = next_q_values.max(1)[0]
expected_q_value = reward + self.gamma * next_q_value * (1 - done)
loss = F.smooth_l1_loss(q_value, expected_q_value.detach())
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 读取CSV数据
data = pd.read_csv('G:/AAADT/code/TCL/CZCE.TA401_kline_1m.csv')
# 设置超参数
state_size = 8
action_size = 3
learning_rate = 0.001
gamma = 0.99
epsilon = 0.1
initial_oi = data['open_oi'][0]
max_steps = len(data) - 1
# 创建Agent和Environment实例
agent = Agent(state_size, action_size, learning_rate, gamma, epsilon)
env = Environment(data, initial_oi, max_steps)
# 训练Agent
num_episodes = 100
show = []
for episode in range(num_episodes):
env.reset()
state = env.get_state()
total_reward = 0
while not env.is_done():
action = agent.get_action(state)
env.take_action(action)
next_state = env.get_state()
reward = env.get_reward()
done = env.is_done()
agent.update_model(state, action, reward, next_state, done)
state = next_state
total_reward += reward
if episode % 10 == 0:
show.append(total_reward)
print(f"Episode: {episode + 1}, Total Reward: {total_reward}")
# 使用训练好的Agent进行交易
# env.reset()
# state = env.get_state()
# total_reward = 0
#
# while not env.is_done():
# action = agent.get_action(state)
# env.take_action(action)
# next_state = env.get_state()
# reward = env.get_reward()
# done = env.is_done()
#
# state = next_state
# total_reward += reward
# print(f"Total Reward: {total_reward}")
plt.plot(show, '-', c='r', label='reward')
plt.show()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment