在进行一项新任务时,如果有一位专家突然评论说做得不错,继续保持,这无疑会极大地提升完成任务的动力,减少对行动的焦虑。对于机器学习(特别是深度学习)来说,如果在任务进行中,有人告诉,在使用神经网络时,学习率看起来不错,这难道不会让感到轻松吗?这难道不会让对学习率这个超参数的不确定性减少吗?在发现学习率调度器后,确实感到非常高兴。
在深度学习的术语中,学习率是在反向传播过程中从参数中减少的梯度的系数,用于调整参数以最小化代价函数。用外行的话来说,它表示在每个训练周期(前向传播和反向传播)后,希望参数发生多大的变化。
今天,在探索和观察学习率调度器的行动中,将使用数学生成的合成时间序列数据。已经为这个数据集和时间序列数据写了一篇专门的文章,这将帮助开始深入学习时间序列数据。
学习率调度器是Keras API(TensorFlow)中的一个回调函数。回调是在训练过程中根据每个特定回调在特定点被调用的实用工具。当训练神经网络时,这些回调在训练之间被调用以执行它们各自的任务。例如,在例子中,在每个时代的开始,学习率调度器回调从事先定义的时间表函数中获取更新的学习率值,该函数根据当前时代和当前学习率,并在优化器上应用更新的学习率。
首先,导入所有必要的库:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
其次,创建一个辅助函数来绘制时间序列数据,以避免冗余代码:
def plot_series(time, series, format='-', start=0, end=None):
plt.plot(time[start:end], series[start:end], format)
plt.xlabel("Time")
plt.ylabel("Series")
plt.grid(True)
最后,让定义那些将帮助生成合成时间序列数据的函数。对于它们是如何创建的以及它们代表什么的更多理解,强烈推荐查看这篇关于时间序列数据深入研究的文章。
以下是这些函数:
def trend(time, slope=0):
return time * slope
def seasonal_pattern(season_time):
return np.where(season_time < 0.4,
np.cos(season_time * 2 * np.pi),
1 / np.exp(3 * season_time))
def seasonality(time, period, amplitude=1, phase=0):
season_time = ((time + phase) % period) / period
return amplitude * seasonal_pattern(season_time)
def noise(time, noise_level=1, seed=None):
rnd = np.random.RandomState(seed)
return rnd.randn(len(time)) * noise_level
将所有这些特性与创建时间序列数据的逻辑连接起来(不用担心背后的数学,这不在本文的讨论范围内)。
time = np.arange(4 * 365 + 1, dtype='float32')
baseline = 10
series = trend(time, slope=0.1)
amplitude = 20
slope = 0.09
noise_level = 5
series = baseline + trend(time, slope) + seasonality(time, period=365, amplitude=amplitude)
series += noise(time, noise_level, seed=42)
使用定义的辅助函数‘plot_series()’查看时间序列:
plot_series(time, series)
2. 准备训练和测试数据集
首先,将时间序列划分为训练和测试数据。记住,在时间序列数据中,值的序列非常重要,因为它们是特征和表示,用于预测值。
其次,在训练-测试拆分之后,将使用TensorFlow数据集API创建一个窗口化的数据集,这几乎是每次处理时间序列和非序列特定神经网络(如RNN或LSTM)时都会做的事情。有关窗口化数据集的更多详细信息,请查看深入研究时间序列数据的文章。
split_time = 1000
train_series = series[:split_time]
train_time = time[:split_time]
valid_series = series[split_time:]
valid_time = time[split_time:]
def windowed_dataset(series, window_size, batch_size, shuffle_batch_size):
dataset = tf.data.Dataset.from_tensor_slices(series)
dataset = dataset.window(window_size+1, 1, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(window_size+1))
dataset = dataset.shuffle(shuffle_batch_size).map(lambda window: (window[:-1], window[-1:]))
dataset = dataset.batch(batch_size).prefetch(1)
return dataset
window_size = 20
batch_size = 32
shuffle_batch_size = 1000
dataset = windowed_dataset(train_series, window_size, batch_size, shuffle_batch_size)
3. 神经网络和学习率调度器
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(10, activation='relu', input_shape=[window_size]),
tf.keras.layers.Dense(10, activation='relu'),
tf.keras.layers.Dense(1)
])
lr_schedule = tf.keras.callbacks.LearningRateScheduler(
lambda epoch: 1e-8 * 10**(epoch/20)
)
model.compile(loss='mse', optimizer=tf.keras.optimizers.SGD(1e-8, momentum=0.9))
history = model.fit(dataset, epochs=100, callbacks=[lr_schedule], verbose=0)
代码的第一部分是创建一个具有3个全连接层的神经网络,前两层使用‘relu’激活函数。
第二部分是之前提到的调度器函数,它在训练过程中由学习率调度器回调调用,以改变其学习率。在这里,这个函数将学习率从1e-8改变到1e-3。
第三部分是网络的简单编译,同时在model.fit函数中有一个额外的参数‘callbacks’,需要提供一个包含回调的迭代器,这些回调将在训练期间被调用,即lr_schedule。
注意:这个model.fit并不是实际在给定数据集上训练模型,所以不要担心看到损失值的不寻常变化,它只是用来实验不同的学习率,这些学习率在训练过程中会自动改变。损失函数中噪声最小的将是理想的学习率。
模型拟合完成后,history变量中存储了有关当前训练过程的某些值。让可视化损失曲线,以找到曲线中最低的平滑值,x轴以对数范围显示(因为学习率的变化远远超过损失的变化)。
lrs = 1e-8 * (10**(np.arange(100)/20))
plt.semilogx(lrs, history.history['loss'])
plt.axis([1e-8, 1e-3, 0, 300])
这是一个视觉选择,选择认为最低且保持平滑的值。在例子中,选择了图中提到的点(3e-6)。
在决定学习率后,现在是时候让真正开始在给定数据集上训练模型了。
4. 训练和测试模型
现在注意,不会使用lr_schedule作为回调,因为现在在训练过程中不需要改变学习率,因为已经通过实验得到了几乎最优的学习率。
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(10, activation='relu', input_shape=[window_size]),
tf.keras.layers.Dense(10, activation='relu'),
tf.keras.layers.Dense(1)
])
model.compile(loss='mse', optimizer=tf.keras.optimizers.SGD(3e-6, momentum=0.9)) ## 从图中选择了3e-6
history = model.fit(dataset, epochs=50, verbose=0)
请注意,通过选择一个优秀的学习率,赋予了网络多大的力量,训练完成后,模型收敛得有多快!在10个时代里,几乎得到了最优的损失,并且在接下来的10个时代里,这条线看起来几乎是平的。
loss = history.history['loss']
epochs = range(len(loss))
plt.figure(dpi=150)
plt.plot(epochs, loss, 'b', label="training_loss")
plt.show()
测试/预测模型:
forecast = []
for time in range(len(series) - window_size):
forecast.append(model.predict(series[time:time + window_size][np.newaxis]))
forecast = forecast[split_time - window_size:]
results = np.array(forecast)[:, 0, 0]
测试时间序列窗口化数据集的方式与可能习惯的方式略有不同。要了解更多信息,请查看上述文章:深入研究时间序列数据。
可视化预测:
plt.figure(figsize=(10, 6))
plot_series(valid_time, valid_series)
plot_series(valid_time, results)
作为一个指标,可以查看平均绝对损失,看看模型表现得如何:
tf.keras.metrics.mean_absolute_error(valid_series, results).numpy()