本例使用TensorFlow中的函数来演示搭建一个简单RNN网络,使用一串随机的模拟数据作为原始信号,让RNN网络来拟合其对应的回声信号。详细介绍如下。
样本数据为一串随机的由0、1组成的数字,将其当成发射出去的一串信号。当碰到阻挡被反弹回来时,会收到原始信号的回音。如果步长为3,那么输入和输出的序列如图所示。
如图所示,回声序列的前3项是null,原序列的第一个信号0,对应的是回声序列的第4项,即回声序列的每一个数都会比原序列滞后3个时序。本例的任务就是将序列截取出来,对于每个原序列来预测它的回声序列。
构建的网络结构如图所示。
图中,初始的输入有5个,其中4个是中间状态,1个是x的序列值。通过一层具有4个节点的RNN网络,再接一个全连接层输出0
、1
分类。这样序列中的每个x都会有一个对应的预测分类值,最终将整个序列x生成了预测序列。具体步骤如下。
实例描述
构建一组序列,生成其对应的模拟回声序列。使用TensorFlow创建一个简单循环神经网络拟合这个回声序列。
1.定义参数生成样本数据在了解前面样本的规则后,开始编写代码制作样本。
代码 echo模拟
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
num_epochs = 5
total_series_length = 50000
truncated_backprop_length = 15
state_size = 4
num_classes = 2
echo_step = 3
batch_size = 5
num_batches = total_series_length // batch_size // truncated_backprop_length
def generateData():
x = np.array(np.random.choice(2, total_series_length, p=[0.5, 0.5])) # 在0 和1 中选择total_series_length个数
y = np.roll(x, echo_step) # 向右循环移位,将【1111000】变为【0001111】
y[0:echo_step] = 0
x = x.reshape((batch_size, -1)) # 5,10000
y = y.reshape((batch_size, -1))
return (x, y)
导入Python库,定义相关参数,取50000个序列样本数据,每个测试数据截取15个序列,回声序列的步长为3,最小批次为5。定义生成样本函数generateData,在函数里先随机生成50000个0、1数据的数组x,作为原始的序列,令x里的数据向右循环移动3个位置,生成数据y,作为x的回声序列。因为回声步长是3,表明回声y是从x的第3个数据开始才出现,所以将y的前3个数据清零。
2.定义占位符处理输入数据
代码 echo模拟(续)
batchX_placeholder = tf.placeholder(tf.float32, [batch_size, truncated_backprop_length])
batchY_placeholder = tf.placeholder(tf.int32, [batch_size, truncated_backprop_length])
init_state = tf.placeholder(tf.float32, [batch_size, state_size])
# 将batchX_Placeholder沿维度为1的轴方向进行拆分
inputs_series = tf.unstack(batchX_placeholder, axis=1) # truncated_backprop_length个序列
labels_series = tf.unstack(batchY_placeholder, axis=1)
定义3个占位符,输入的batchX_placeholder原始序列,回声batchY_placeholder作为标签,循环节点的初始值state。如前面介绍的网络结构,x的原始序列是逐个输入网络的,所以需要将输进去的数据打散,按照时间序列变成15个数组,每个数组有batch_size个元素,进行统一批处理。
3.定义网络结构
按照上图中的网络结构,定义一层循环网络与一层全连接网络。由于数据是一个数组序列,所以需要通过循环将输入数据按照原有序列逐个输入网络,并输出对应的predictions序列。同样的,对于每个序列值都要对其输出做loss计算,在loss中使用了sparse_softmax_cross_entropy_with_logits函数,因为label的最大值正好是1,而且是一位的,就不需要再转成one_hot编码了(具体细节见本书6.5.3节),最终将所有的loss取均值放入优化器中。
代码 echo模拟(续)
current_state = init_state
predictions_series = []
losses = []
# 使用一个循环,按照序列逐个输入
for current_input, labels in zip(inputs_series, labels_series):
current_input = tf.reshape(current_input, [batch_size
predictions_series = []
lossescurrent_state = init_state
predictions_series = []
losses = []
# 使用一个循环,按照序列逐个输入
for current_inputcurrent_state = init_state
predictions_series = []
losses = []
# 使用一个循环,按照序列逐个输入
for current_input, labels in zip(inputs_seriescurrent_state = init_state
predictions_series = []
losses = []
# 使用一个循环,按照序列逐个输入
forcurrent_state = init_state
predictions_series = []
losses = []
# 使用一个循环,按照序列逐个输入
for current_input, labels in zip(inputs_series, labels_series):
current_input = tf.reshape(current_input, [batch_size, 1])
# 加入初始状态
input_and_state_concatenated = tf.concat([current_input, current_state], 1)
next_state = tf.contrib.layers.fully_connected(input_and_state_concatenated, state_size, activation_fn=tf.tanh)
current_state = next_state
logits = tf.contrib.layers.fully_connected(next_state, num_classes, activation_fn=None)
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits)
losses.append(loss)
predictions = tf.nn.softmax(logits)
predictions_series.append(predictions)
total_loss = tf.reduce_mean(losses)
train_step = tf.train.AdagradOptimizer(0.3).minimize(total_loss)
4.建立session训练数据
建立session,总样本循环10次进行迭代。将初始化循环神经网络的状态设为0,在总样本中循环读取15个序列作为批次中的一个样本。
代码 echo模拟(续)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
plt.ion()
plt.figure()
plt.show()
loss_list = []
for epoch_idx in range(num_epochs):
x, y = generateData()
_current_state = np.zeros((batch_size, state_size))
print("New data, epoch", epoch_idx)
for batch_idx in range(num_batches): # 50000/ 5 /15=分成多少段
start_idx = batch_idx * truncated_backprop_length
end_idx = start_idx + truncated_backprop_length
batchX = x[:, start_idx:end_idx]
batchY = y[:, start_idx:end_idx]
_total_loss, _train_step, _current_state, _predictions_series = sess.run(
[total_loss, train_step, current_state, predictions_series],
feed_dict={
batchX_placeholder: batchX,
batchY_placeholder: batchY,
init_state: _current_state
})
loss_list.append(_total_loss)
5.测试模型及可视化
每循环100次,将打印数据并调用plot函数生成图像。
代码 echo模拟(续)
if batch_idx%100 == 0:
print("Step",batch_idx, "Loss", _total_loss)
plot(loss_list, _predictions_series, batchX, batchY)
plt.ioff()
plt.show()
plot函数定义如下:
def plot(loss_list, predictions_series, batchX, batchY):
plt.subplot(2, 3, 1)
plt.cla()
plt.plot(loss_list)
for batch_series_idx in range(batch_size):
one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :]
single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series])
plt.subplot(2, 3, batch_series_idx + 2)
plt.cla()
plt.axis([0, truncated_backprop_length, 0, 2])
left_offset = range(truncated_backprop_length)
left_offset2 = range(echo_step,truncated_backprop_length+echo_step)
label1 = "past values"
label2 = "True echo values"
label3 = "Predictions"
plt.plot(left_offset2, batchX[batch_series_idx, :]*0.2+1.5, "o--b", label=label1)
plt.plot(left_offset, batchY[batch_series_idx, :]*0.2+0.8,"x--b", label=label2)
plt.plot(left_offset, single_output_series*0.2+0.1 , "o--y", label=label3)
plt.legend(loc='best')
plt.draw()
plt.pause(0.0001)
函数中将输入的x序列、回声y序列和预测的序列同时打印到图像中。按照批次的个数生成图像。为了让3个序列看起来更明显,将其缩放0.2,并且调节每个图像的高度。同时将第一个原始序列的x在显示中滞后echo_step个序列,将3个图像放在同一序列顺序中比较。
运行代码,生成如下结果,如下图所示。
New data, epoch 0
Step 0 Loss 0.760327
Step 100 Loss 0.462219
Step 200 Loss 0.364076
……
New data, epoch 4
Step 0 Loss 0.324354
Step 100 Loss 0.103451
Step 200 Loss 0.0894693
Step 300 Loss 0.0940791
Step 400 Loss 0.09462
Step 500 Loss 0.10184
Step 600 Loss 0.0910746
最下面的点为预测的序列,中间的为回声序列,从图像中可以看到预测序列和回声序列几乎相同,表明RNN网络已经完全可以学到回声的规则。