异常检测,又称为离群点检测,是指在数据集中识别那些显著偏离一般行为的数据点的过程。异常数据通常指那些不符合常规模式的数据。异常检测的应用非常广泛,包括但不限于欺诈检测、故障检测和入侵检测等。在异常检测中,训练数据既包含异常也包含正常观测值,而在新颖性检测中,训练数据仅包含正常观测值,不包含异常观测值。本文将探讨新颖性检测的一个用例。
自编码器
自编码器是一种无监督的人工神经网络,它通过将数据压缩到更低维度(瓶颈层或编码层)然后解码数据以重建原始输入来尝试编码数据。瓶颈层(或编码)保存输入数据的压缩表示。编码中的隐藏单元数量称为编码大小。自编码器的应用包括降维、异常检测、图像去噪、图像压缩和图像生成等。本文将深入探讨使用自编码器进行异常检测的方法。
使用TensorFlow的自编码器进行异常检测
自编码器在异常检测中被广泛使用。重建误差被用作异常分数。让看看如何使用TensorFlow中的自编码器进行异常检测。首先,导入所需的库并加载数据。这里使用的是ECG数据,包含0和1两个标签。标签0表示观测值为异常,标签1表示观测值为正常。
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Dense, Dropout
from sklearn.model_selection import train_test_split
from tensorflow.keras.losses import MeanSquaredLogarithmicError
下载数据集并查看数据形状,最后一列是目标列,0表示异常,1表示正常。将数据分为训练和测试集,并使用MinMaxScaler进行数据缩放。
PATH_TO_DATA = 'http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv'
data = pd.read_csv(PATH_TO_DATA, header=None)
data.head()
# 数据形状
# (4998, 141)
OUTPUT = 140
features = data.drop(OUTPUT, axis=1)
target = data[OUTPUT]
x_train, x_test, y_train, y_test = train_test_split(features, target, test_size=0.2, stratify=target)
由于使用案例是新颖性检测,因此仅使用正常数据进行训练。对输入数据进行最小最大缩放。
min_max_scaler = MinMaxScaler(feature_range=(0, 1))
x_train_scaled = min_max_scaler.fit_transform(x_train)
x_test_scaled = min_max_scaler.transform(x_test)
通过继承TensorFlow中的Model类创建模型。模型由编码器和解码器组成,编码器将数据编码到更低维度,解码器则重建输入数据。
class AutoEncoder(Model):
def __init__(self, output_units, code_size=8):
super().__init__()
self.encoder = Sequential([
Dense(64, activation='relu'),
Dropout(0.1),
Dense(32, activation='relu'),
Dropout(0.1),
Dense(16, activation='relu'),
Dropout(0.1),
Dense(code_size, activation='relu')
])
self.decoder = Sequential([
Dense(16, activation='relu'),
Dropout(0.1),
Dense(32, activation='relu'),
Dropout(0.1),
Dense(64, activation='relu'),
Dropout(0.1),
Dense(output_units, activation='sigmoid')
])
def call(self, inputs):
encoded = self.encoder(inputs)
decoded = self.decoder(encoded)
return decoded
模型使用均方对数误差损失函数和Adam优化器进行编译,然后进行训练。
model = AutoEncoder(output_units=x_train_scaled.shape[1])
model.compile(loss='msle', metrics=['mse'], optimizer='adam')
history = model.fit(x_train_scaled, x_train_scaled, epochs=20, batch_size=512, validation_data=(x_test_scaled, x_test_scaled))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.xlabel('Epochs')
plt.ylabel('MSLE Loss')
plt.legend(['loss', 'val_loss'])
plt.show()
def find_threshold(model, x_train_scaled):
reconstructions = model.predict(x_train_scaled)
reconstruction_errors = tf.keras.losses.msle(reconstructions, x_train_scaled)
threshold = np.mean(reconstruction_errors.numpy()) + np.std(reconstruction_errors.numpy())
return threshold
def get_predictions(model, x_test_scaled, threshold):
predictions = model.predict(x_test_scaled)
errors = tf.keras.losses.msle(predictions, x_test_scaled)
anomaly_mask = pd.Series(errors) > threshold
preds = anomaly_mask.map(lambda x: 0.0 if x else 1.0)
return preds
threshold = find_threshold(model, x_train_scaled)
predictions = get_predictions(model, x_test_scaled, threshold)
accuracy_score(predictions, y_test)