返回到文章

采纳

编辑于

使用RNN网络拟合回声信号序列

TensorFlow
循环神经网络

本例使用TensorFlow中的函数来演示搭建一个简单RNN网络,使用一串随机的模拟数据作为原始信号,让RNN网络来拟合其对应的回声信号。详细介绍如下。

样本数据为一串随机的由0、1组成的数字,将其当成发射出去的一串信号。当碰到阻挡被反弹回来时,会收到原始信号的回音。如果步长为3,那么输入和输出的序列如图所示。

screenshot

如图所示,回声序列的前3项是null,原序列的第一个信号0,对应的是回声序列的第4项,即回声序列的每一个数都会比原序列滞后3个时序。本例的任务就是将序列截取出来,对于每个原序列来预测它的回声序列。

构建的网络结构如图所示。

screenshot

图中,初始的输入有5个,其中4个是中间状态,1个是x的序列值。通过一层具有4个节点的RNN网络,再接一个全连接层输出01分类。这样序列中的每个x都会有一个对应的预测分类值,最终将整个序列x生成了预测序列。具体步骤如下。

实例描述

构建一组序列,生成其对应的模拟回声序列。使用TensorFlow创建一个简单循环神经网络拟合这个回声序列。

1.定义参数生成样本数据在了解前面样本的规则后,开始编写代码制作样本。

代码 echo模拟

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

num_epochs = 5
total_series_length = 50000
truncated_backprop_length = 15
state_size = 4
num_classes = 2
echo_step = 3
batch_size = 5
num_batches = total_series_length // batch_size // truncated_backprop_length

def generateData():
    x = np.array(np.random.choice(2, total_series_length, p=[0.5, 0.5]))  # 在0 和1 中选择total_series_length个数
    y = np.roll(x, echo_step)  # 向右循环移位,将【1111000】变为【0001111】
    y[0:echo_step] = 0

    x = x.reshape((batch_size, -1))  # 5,10000
    y = y.reshape((batch_size, -1))

    return (x, y)

导入Python库,定义相关参数,取50000个序列样本数据,每个测试数据截取15个序列,回声序列的步长为3,最小批次为5。定义生成样本函数generateData,在函数里先随机生成50000个0、1数据的数组x,作为原始的序列,令x里的数据向右循环移动3个位置,生成数据y,作为x的回声序列。因为回声步长是3,表明回声y是从x的第3个数据开始才出现,所以将y的前3个数据清零。

2.定义占位符处理输入数据

代码 echo模拟(续)

batchX_placeholder = tf.placeholder(tf.float32, [batch_size, truncated_backprop_length])
batchY_placeholder = tf.placeholder(tf.int32, [batch_size, truncated_backprop_length])
init_state = tf.placeholder(tf.float32, [batch_size, state_size])

# 将batchX_Placeholder沿维度为1的轴方向进行拆分
inputs_series = tf.unstack(batchX_placeholder, axis=1)  # truncated_backprop_length个序列
labels_series = tf.unstack(batchY_placeholder, axis=1)

定义3个占位符,输入的batchX_placeholder原始序列,回声batchY_placeholder作为标签,循环节点的初始值state。如前面介绍的网络结构,x的原始序列是逐个输入网络的,所以需要将输进去的数据打散,按照时间序列变成15个数组,每个数组有batch_size个元素,进行统一批处理。

3.定义网络结构

按照上图中的网络结构,定义一层循环网络与一层全连接网络。由于数据是一个数组序列,所以需要通过循环将输入数据按照原有序列逐个输入网络,并输出对应的predictions序列。同样的,对于每个序列值都要对其输出做loss计算,在loss中使用了sparse_softmax_cross_entropy_with_logits函数,因为label的最大值正好是1,而且是一位的,就不需要再转成one_hot编码了(具体细节见本书6.5.3节),最终将所有的loss取均值放入优化器中。

代码 echo模拟(续)

current_state = init_state
predictions_series = []
losses = []
# 使用一个循环,按照序列逐个输入
for current_input, labels in zip(inputs_series, labels_series):
    current_input = tf.reshape(current_input, [batch_size, 1])
    # 加入初始状态
    input_and_state_concatenated = tf.concat([current_input, current_state], 1)

    next_state = tf.contrib.layers.fully_connected(input_and_state_concatenated, state_size, activation_fn=tf.tanh)
    current_state = next_state
    logits = tf.contrib.layers.fully_connected(next_state, num_classes, activation_fn=None)
    loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits)
    losses.append(loss)
    predictions = tf.nn.softmax(logits)
    predictions_series.append(predictions)

total_loss = tf.reduce_mean(losses)
train_step = tf.train.AdagradOptimizer(0.3).minimize(total_loss)

4.建立session训练数据

建立session,总样本循环10次进行迭代。将初始化循环神经网络的状态设为0,在总样本中循环读取15个序列作为批次中的一个样本。

代码 echo模拟(续)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    plt.ion()
    plt.figure()
    plt.show()
    loss_list = []

    for epoch_idx in range(num_epochs):
        x, y = generateData()
        _current_state = np.zeros((batch_size, state_size))

        print("New data, epoch", epoch_idx)

        for batch_idx in range(num_batches):  # 50000/ 5 /15=分成多少段
            start_idx = batch_idx * truncated_backprop_length
            end_idx = start_idx + truncated_backprop_length

            batchX = x[:, start_idx:end_idx]
            batchY = y[:, start_idx:end_idx]

            _total_loss, _train_step, _current_state, _predictions_series = sess.run(
                [total_loss, train_step, current_state, predictions_series],
                feed_dict={
                    batchX_placeholder: batchX,
                    batchY_placeholder: batchY,
                    init_state: _current_state
                })

            loss_list.append(_total_loss)

5.测试模型及可视化

每循环100次,将打印数据并调用plot函数生成图像。

代码 echo模拟(续)

            if batch_idx%100 == 0:
                print("Step",batch_idx, "Loss", _total_loss)
                plot(loss_list, _predictions_series, batchX, batchY)

plt.ioff()
plt.show()

plot函数定义如下:

def plot(loss_list, predictions_series, batchX, batchY):
    plt.subplot(2, 3, 1)
    plt.cla()
    plt.plot(loss_list)

    for batch_series_idx in range(batch_size):
        one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :]
        single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series])

        plt.subplot(2, 3, batch_series_idx + 2)
        plt.cla()
        plt.axis([0, truncated_backprop_length, 0, 2])
        left_offset = range(truncated_backprop_length)
        left_offset2 = range(echo_step,truncated_backprop_length+echo_step)

        label1 = "past values"
        label2 = "True echo values" 
        label3 = "Predictions"      
        plt.plot(left_offset2, batchX[batch_series_idx, :]*0.2+1.5, "o--b", label=label1)
        plt.plot(left_offset, batchY[batch_series_idx, :]*0.2+0.8,"x--b", label=label2)
        plt.plot(left_offset,  single_output_series*0.2+0.1 , "o--y", label=label3)

    plt.legend(loc='best')
    plt.draw()
    plt.pause(0.0001)

函数中将输入的x序列、回声y序列和预测的序列同时打印到图像中。按照批次的个数生成图像。为了让3个序列看起来更明显,将其缩放0.2,并且调节每个图像的高度。同时将第一个原始序列的x在显示中滞后echo_step个序列,将3个图像放在同一序列顺序中比较。

运行代码,生成如下结果,如下图所示。

New data, epoch 0
Step 0 Loss 0.760327
Step 100 Loss 0.462219
Step 200 Loss 0.364076
……
New data, epoch 4
Step 0 Loss 0.324354
Step 100 Loss 0.103451
Step 200 Loss 0.0894693
Step 300 Loss 0.0940791
Step 400 Loss 0.09462
Step 500 Loss 0.10184
Step 600 Loss 0.0910746

screenshot

最下面的点为预测的序列,中间的为回声序列,从图像中可以看到预测序列和回声序列几乎相同,表明RNN网络已经完全可以学到回声的规则。