【强化学习DDPG算法预测股票走势】

编程入门 行业动态 更新时间:2024-10-10 06:15:22

【强化学习DDPG<a href=https://www.elefans.com/category/jswz/34/1770096.html style=算法预测股票走势】"/>

【强化学习DDPG算法预测股票走势】

利用强化学习DDPG预测股票近几天走势。
先从网上下载相应的股票历史数据保存为.csv文件,然后用如下代码进行读取数据:

"""
self.x 存储收盘价,剔除数据为零的无效数据
self.x中读取的csv文件数据顺序和scv中顺序一样,注意数据时间的排序
注意训练的数据,不要用其他的数据训练并预测,同时注意读取的收盘价的数据,要修改代码读取csv对应的列数据
"""
import matplotlib.pyplot as plt
import csv
class Env():def __init__(self):self.x=self.read_data()#定义连续动作的范围self.action_up=100self.action_down=0#已经观测的个数self.observation_lenth=100 #每次观测的数据个数self.length_obs=100self.obs_step=1 #每次移动一个为步长self.observation_space_features=100#记录最初的数据长度,预算未来长度self.data_lenth=0#要预测的天数self.predict_lenth=15#预测数据self.predict_list=[]def read_data(self):path = 'D:\\Simulation\\优秀项目\\Stock\\(ddpg)stock_predict\\600420.csv'x=[]with open(path, 'r') as f:reader = csv.reader(f)for i in reader:x.append(i[3]) #读取第三列元素,type是str类型del(x[0])  #删除list中第一个元素,因为它是文字,不能转化为float#将x中的元素str转化为float类型y=[]for i in x:#去掉数据中为0的元素,因为这种数据是错误的k=float(i)if k!=0:  y.append(k)"""逆向排列y中元素,把近期的放在后面,以前放在前面,显示检查print"""y=y[::-1]#print("数据检查,近三天数据为:",y[-3:])#计算最初数据长度self.data_lenth=len(y)return ydef step(self,action):self.observation_lenth+=self.obs_stepif self.data_lenth<=self.observation_lenth: #这一步刚好等于数据+预测长度,那么做完了done=Trueelse:done=Falseobservation_=self.x[self.observation_lenth-self.length_obs:self.observation_lenth]reward=10/(abs(observation_[-1]-action[0])+1)#print("预测值:%f " %(action),"实际值:%f"%(observation_[-1]),reward)return observation_, reward, donedef predic_step(self,action):self.observation_lenth+=self.obs_stepif self.data_lenth==self.observation_lenth:print("原始数据值近三天数据为:",self.x[-3:])#超出数据长度后附加预测值if self.data_lenth<self.observation_lenth:self.x.append(action)self.predict_list.append(action[0])#这一步刚好等于数据+预测长度,那么做完了    if (self.data_lenth+self.predict_lenth)<=self.observation_lenth: done=Trueprint("预测值为:",self.predict_list)plt.plot(list(range(len(self.predict_list))),self.predict_list)plt.xlabel("day")plt.ylabel("price")plt.show()else:done=Falseobservation_=self.x[self.observation_lenth-self.length_obs:self.observation_lenth]#取action的一个数据与观测到的下一天的数据做差,差越小奖励越大,绝对值倒数的方法表示,10控制奖励过大reward=10/(abs(self.x[self.length_obs]-action[0])+1)return observation_, reward, donedef reset(self):self.length_obs=100self.observation_lenth=100#重新读取没预测的数据self.x=self.read_data()observation=self.x[0:self.observation_lenth]return observation

接下来对历史数据进行学习并预测后一周的收盘价走势:

"""
1.路径改为现在的模型路径
2.训练的模型数据和要预测的数据匹配,也可以在别人数据的基础上学习再预测
3.perice的价格在100以内,否则需要修环境中和此py文件中的action_down和action_up
"""
import tensorflow as tf
import numpy as np
from date_env import Env
import matplotlib.pyplot as plt
import time
import random
RENDER = False
C_UPDATE_STEPS = 1
A_UPDATE_STEPS = 1class Experience_Buffer():def __init__(self,env,buffer_size = 1000):self.buffer = []#状态的长度self.n_features = env.observation_space_featuresself.buffer_size = buffer_sizedef add_experience(self,experience):if len(self.buffer)+len(experience) >= self.buffer_size:self.buffer[0:(len(experience)+len(self.buffer))-self.buffer_size]=[]self.buffer.extend(experience)def sample(self, samples_num):"""samples_num表示要抽取的[s,a,next_s,r]这样的四元组的个数""""""将抽取的四元组重新reshape成samples_num*4的numpy(矩阵,元组)"""sample_data = np.reshape(np.array(random.sample(self.buffer, samples_num)),[samples_num, 4])#取第一个元组的第一个元素,即strain_s = np.array(sample_data[0,0])#取第一个元组的第一个元素,即next_strain_s_ = np.array(sample_data[0,3])#取所有元组的正数第二,三个元素,即a,rtrain_a = sample_data[:, 1]train_r = sample_data[:, 2]"""把抽样中的所有s用垂直堆(vstack)在一起,next_s也一样,a和r已经堆好了"""for i in range(samples_num-1):train_s = np.vstack((train_s, np.array(sample_data[i+1,0])))train_s_ = np.vstack((train_s_, np.array(sample_data[i+1,3])))"""因为每个s都是3数,3个数就是状态,所以把他们抽样个数*3"""train_s = np.reshape(train_s,[samples_num,self.n_features])train_s_ = np.reshape(train_s_,[samples_num,self.n_features])train_r = np.reshape(train_r, [samples_num,1])train_a = np.reshape(train_a,[samples_num,1])"""一个抽样为一个训练状态,其实是有环境中的多个s组成,同理next_s也一样""""""相当于在经验池中随机抽取多个状态加以训练,如train_r=[[r1],[r2],...,[r_samples_num]]"""return train_s, train_a, train_r, train_s_
#定义策略网络
class Policy_Net():def __init__(self, env, action_bound, lr = 0.0001, model_file=None):self.action_bound = action_boundself.gamma = 0.90self.tau = 0.01#  tf工程self.sess = tf.Session()self.learning_rate = lr#输入特征的维数3self.n_features = env.observation_space_features#输出动作空间的维数self.n_actions = 1#1. 输入层self.obs = tf.placeholder(tf.float32, shape=[None, self.n_features])self.obs_ = tf.placeholder(tf.float32, shape=[None, self.n_features])#2.创建网络模型#2.1 创建策略网络,策略网络的命名空间为: 'actor'with tf.variable_scope('actor'):"""#可训练的策略网络,可训练的网络参数命名空间为: actor/eval:,self.see用来接受网络第一层的值,用于观察,可去掉"""self.action,self.see = self.build_a_net(self.obs, scope='eval', trainable=True)#靶子策略网络,不可训练,网络参数命名空间为:actor/target:self.action_,self.see_=self.build_a_net(self.obs_, scope='target',trainable=False)#2.2 创建行为值函数网络,行为值函数的命名空间为: 'critic'with tf.variable_scope('critic'):#可训练的行为值网络,可训练的网络参数命名空间为:critic/evalQ = self.build_c_net(self.obs, self.action, scope='eval', trainable=True)Q_ = self.build_c_net(self.obs_, self.action_, scope='target', trainable=False)#2.3 整理4套网络参数#2.3.1:可训练的策略网络参数self.ae_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='actor/eval')"""2.3.2: 不可训练的策略网络参数,用来将可训练的网络参数赋值给它,它不可训练,但是可以赋值,把训练好的参数赋值给它"""self.at_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='actor/target')#2.3.3: 可训练的行为值网络参数self.ce_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='critic/eval')"""2.3.4: 不可训练的策略网络参数,用来将可训练的网络参数赋值给它,它不可训练,但是可以赋值"""self.ct_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='critic/target')"""#2.4 定义新旧参数的替换操作olda.assign表示一种参数的赋值,将新的参数赋值给olda,将可训练网络的参数赋值给不可训练网络"""self.update_olda_op = [olda.assign((1-self.tau)*olda+self.tau*p) for p,olda in zip(self.ae_params, self.at_params)]self.update_oldc_op = [oldc.assign((1-self.tau)*oldc+self.tau*p) for p,oldc in zip(self.ce_params, self.ct_params)]#3.构建损失函数"""#3.1 构建行为值函数的损失函数"""self.R = tf.placeholder(tf.float32, [None, 1])Q_target = self.R + self.gamma * Q_self.c_loss = tf.losses.mean_squared_error(labels=Q_target, predictions=Q)"""#3.2 构建策略损失函数,该函数为行为值函数,使Q值函数的相反数越小越好,使尽量找到最大Q值"""   self.a_loss=-tf.reduce_mean(Q)#4. 定义优化器"""#4.1 定义动作优化器,注意优化的变量在ca_params中"""self.a_train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(self.a_loss, var_list=self.ae_params)"""4.2 定义值函数优化器,注意优化的变量在ce_params中"""self.c_train_op = tf.train.AdamOptimizer(0.0002).minimize(self.c_loss, var_list=self.ce_params)#5. 初始化图中的变量self.sess.run(tf.global_variables_initializer())#6.定义保存和恢复模型self.saver = tf.train.Saver()if model_file is not None:self.restore_model(model_file)def build_c_net(self,obs, action, scope, trainable):with tf.variable_scope(scope):c_l1 = 1000"""#与状态相对应的权值,tf.get_variable初始化生成3*50的矩阵"""w1_obs = tf.get_variable('w1_obs',[self.n_features, c_l1], trainable=trainable)"""#与动作相对应的权值,tf.get_variable初始化生成1*50的矩阵"""w1_action = tf.get_variable('w1_action',[self.n_actions, c_l1],trainable=trainable)b1 = tf.get_variable('b1',[1, c_l1], trainable=trainable)"""第一层隐含层,这种网络就是对应矩阵的乘积,然后加上激活函数,先是(1*3的矩阵乘以3*50的矩阵)+(1*1乘以1*50的矩阵)+偏置b"""c_f1 = tf.nn.relu(tf.matmul(obs, w1_obs)+tf.matmul(action,w1_action)+b1)"""第二层, 行为值函数输出层,不用激活函数.第一层输出的是1*50的矩阵"""c_out = tf.layers.dense(c_f1, units=1, trainable=trainable)return c_outdef build_a_net(self, obs, scope, trainable):with tf.variable_scope(scope):# 行为值网络第一层隐含层,返回a_f1仅仅用于查看,可去掉a_f1 = tf.layers.dense(inputs=obs, units=5, activation=tf.nn.relu, trainable=trainable)"""# 第二层, 确定性策略,注意激活函数tanh得输出范围在-1到1之间,所以要乘以适当的值扩大范围"""a_out =  tf.layers.dense(a_f1, units=self.n_actions, activation=tf.nn.relu, trainable=trainable)return tf.clip_by_value(a_out, action_bound[0], action_bound[1]),a_f1#根据策略网络选择动作def choose_action(self, state):action = self.sess.run(self.action, {self.obs:state})"""观察区"""#第一层参数#first_layer = self.sess.run(self.see, {self.obs:state})#see = self.sess.run(self.see, {self.obs:state})#print("see",first_layer,'===')#print('==============')#print("see",action)return action[0]#定义训练def train_step(self, train_s, train_a, train_r, train_s_):for _ in range(A_UPDATE_STEPS):"""只需要喂入s就行,因为a_train_op中loss的Q(s,a)中的a也是喂入s得出来的"""self.sess.run(self.a_train_op, feed_dict={self.obs:train_s})for _ in range(C_UPDATE_STEPS):self.sess.run(self.c_train_op, feed_dict={self.obs:train_s, self.action:train_a, self.R:train_r, self.obs_:train_s_})"""#下面loss可以不要,为了查看"""#loss = self.sess.run(self.c_loss, feed_dict={self.obs:train_s, self.action:train_a, self.R:train_r, self.obs_:train_s_})#print("损失值:",loss)# 更新旧的策略网络self.sess.run(self.update_oldc_op)self.sess.run(self.update_olda_op)# return a_loss, c_loss#定义存储模型函数def save_model(self, model_path,global_step):self.saver.save(self.sess, model_path,global_step=global_step)#定义恢复模型函数def restore_model(self, model_path):self.saver.restore(self.sess, model_path)
def policy_train(env, brain, exp_buffer, training_num):reward_sum = 0average_reward_line = []training_time = []average_reward = 0batch = 32# for i in range(training_num):#     sample_states,sample_actions, sample_rs = sample.sample_steps(32)#     a_loss,c_loss = brain.train_step(sample_states, sample_actions,sample_rs)for i in range(training_num):total_reward = 0#初始化环境observation = env.reset()done = Falsewhile True:#探索权重衰减var = 3*np.exp(-i/100)state = np.reshape(observation, [1,brain.n_features])#根据神经网络选取动作action = brain.choose_action(state)"""#给动作添加随机项,以便进行探索,np.clip设置上下限"""action = np.clip(np.random.normal(action, var), action_bound[0], action_bound[1])obeservation_next, reward, done = env.step(action)#print("下一状态r:",reward)#print("特征:",env.observation_space.shape[0])# 存储一条经验experience = np.reshape(np.array([observation,action[0],reward,obeservation_next]),[1,4])exp_buffer.add_experience(experience)if len(exp_buffer.buffer)>batch:"""#通过采样数据,并进行训练"""train_s, train_a, train_r, train_s_ = exp_buffer.sample(batch)brain.train_step(train_s, train_a, train_r, train_s_)#推进一步observation = obeservation_nexttotal_reward += rewardif done:breakif i == 0:average_reward = total_rewardelse:average_reward = 0.95*average_reward + 0.05*total_rewardprint("第%d次学习后的平均回报为:%f"%(i,average_reward))average_reward_line.append(average_reward)training_time.append(i)#每20次保存模型if i>1 and i%50==0:brain.save_model('net/',2732)plt.plot(training_time, average_reward_line)plt.xlabel("training number")plt.ylabel("score")plt.show()
def policy_test(env, policy,test_num):for i in range(test_num):reward_sum = 0observation = env.reset()print("第%d次测试,初始三天数据:%f,%f,%f" % (i, observation[0], observation[1], observation[2]))# 将一个episode的回报存储起来while True:# 根据策略网络产生一个动作state = np.reshape(observation, [1, 100])action = policy.choose_action(state)observation_, reward, done = env.predic_step(action)reward_sum += rewardif done:print("第%d次测试总回报%f" % (i,reward_sum))breakobservation = observation_# return reward_sumif __name__=='__main__':#创建环境env = Env()action_bound = [env.action_down,env.action_up]#实例化策略网络#brain = Policy_Net(env,action_bound)"""上面可以更改为下面,下载当前最好得模型进行测试,也可以在别人数据的基础上学习"""brain = Policy_Net(env,action_bound,model_file='D:\\Simulation\\优秀项目\\Stock\\(ddpg)stock_predict\\net\\-2732')#经验缓存exp_buffer = Experience_Buffer(env)training_num = 500#训练策略网络,训练好之后注释掉policy_train(env, brain, exp_buffer,training_num)"""#测试训练的网络"""reward_sum = policy_test(env, brain,1)

更多推荐

【强化学习DDPG算法预测股票走势】

本文发布于:2024-02-06 10:17:57,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1748061.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:算法   股票走势   DDPG

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!