异常检测与自编码器的应用

异常检测,又称为离群点检测,是指在数据集中识别那些显著偏离一般行为的数据点的过程。异常数据通常指那些不符合常规模式的数据。异常检测的应用非常广泛,包括但不限于欺诈检测、故障检测和入侵检测等。在异常检测中,训练数据既包含异常也包含正常观测值,而在新颖性检测中,训练数据仅包含正常观测值,不包含异常观测值。本文将探讨新颖性检测的一个用例。

自编码器

自编码器是一种无监督的人工神经网络,它通过将数据压缩到更低维度(瓶颈层或编码层)然后解码数据以重建原始输入来尝试编码数据。瓶颈层(或编码)保存输入数据的压缩表示。编码中的隐藏单元数量称为编码大小。自编码器的应用包括降维、异常检测、图像去噪、图像压缩和图像生成等。本文将深入探讨使用自编码器进行异常检测的方法。

使用TensorFlow的自编码器进行异常检测

自编码器异常检测中被广泛使用。重建误差被用作异常分数。让看看如何使用TensorFlow中的自编码器进行异常检测。首先,导入所需的库并加载数据。这里使用的是ECG数据,包含0和1两个标签。标签0表示观测值为异常,标签1表示观测值为正常。

import numpy as np import pandas as pd import tensorflow as tf import matplotlib.pyplot as plt from sklearn.metrics import accuracy_score from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler from tensorflow.keras import Model, Sequential from tensorflow.keras.layers import Dense, Dropout from sklearn.model_selection import train_test_split from tensorflow.keras.losses import MeanSquaredLogarithmicError

下载数据集并查看数据形状,最后一列是目标列,0表示异常,1表示正常。将数据分为训练和测试集,并使用MinMaxScaler进行数据缩放。

PATH_TO_DATA = 'http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv' data = pd.read_csv(PATH_TO_DATA, header=None) data.head() # 数据形状 # (4998, 141) OUTPUT = 140 features = data.drop(OUTPUT, axis=1) target = data[OUTPUT] x_train, x_test, y_train, y_test = train_test_split(features, target, test_size=0.2, stratify=target)

由于使用案例是新颖性检测,因此仅使用正常数据进行训练。对输入数据进行最小最大缩放。

min_max_scaler = MinMaxScaler(feature_range=(0, 1)) x_train_scaled = min_max_scaler.fit_transform(x_train) x_test_scaled = min_max_scaler.transform(x_test)

通过继承TensorFlow中的Model类创建模型。模型由编码器和解码器组成,编码器将数据编码到更低维度,解码器则重建输入数据。

class AutoEncoder(Model): def __init__(self, output_units, code_size=8): super().__init__() self.encoder = Sequential([ Dense(64, activation='relu'), Dropout(0.1), Dense(32, activation='relu'), Dropout(0.1), Dense(16, activation='relu'), Dropout(0.1), Dense(code_size, activation='relu') ]) self.decoder = Sequential([ Dense(16, activation='relu'), Dropout(0.1), Dense(32, activation='relu'), Dropout(0.1), Dense(64, activation='relu'), Dropout(0.1), Dense(output_units, activation='sigmoid') ]) def call(self, inputs): encoded = self.encoder(inputs) decoded = self.decoder(encoded) return decoded

模型使用均方对数误差损失函数和Adam优化器进行编译,然后进行训练。

model = AutoEncoder(output_units=x_train_scaled.shape[1]) model.compile(loss='msle', metrics=['mse'], optimizer='adam') history = model.fit(x_train_scaled, x_train_scaled, epochs=20, batch_size=512, validation_data=(x_test_scaled, x_test_scaled)) plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.xlabel('Epochs') plt.ylabel('MSLE Loss') plt.legend(['loss', 'val_loss']) plt.show() def find_threshold(model, x_train_scaled): reconstructions = model.predict(x_train_scaled) reconstruction_errors = tf.keras.losses.msle(reconstructions, x_train_scaled) threshold = np.mean(reconstruction_errors.numpy()) + np.std(reconstruction_errors.numpy()) return threshold def get_predictions(model, x_test_scaled, threshold): predictions = model.predict(x_test_scaled) errors = tf.keras.losses.msle(predictions, x_test_scaled) anomaly_mask = pd.Series(errors) > threshold preds = anomaly_mask.map(lambda x: 0.0 if x else 1.0) return preds threshold = find_threshold(model, x_train_scaled) predictions = get_predictions(model, x_test_scaled, threshold) accuracy_score(predictions, y_test)
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485