异常检测技术在时间序列数据分析中的应用

异常检测是一种识别数据中显著偏离正常观察值的技术。它在诸如欺诈检测、垃圾邮件过滤、股市价格异常等多个领域都有应用。通过识别异常值,可以提高模型的性能,因为移除潜在的异常值后,模型更有可能表现出色。一种基本的异常检测方法是四分位数范围(IQR),它利用四分位数和四分位距来识别数据中的潜在异常值。其他算法包括孤立森林、COPOD、基于KNN的异常检测、自编码器、LOF等,这些算法主要用于非时间序列的异常检测。

为什么需要针对时间序列数据的特定异常检测算法?要回答这个问题,需要理解时间序列数据的概念。时间序列数据是严格有序的,并且具有自相关性,这意味着数据中的观察值依赖于它们之前的观察值。如果使用标准算法来寻找时间序列数据中的异常值,可能会得到错误的预测。因此,时间序列数据必须被特别对待。

检测时间序列数据中的异常值的步骤包括:

  • 进行ADF检验以检查数据是否稳定。如果数据不稳定,则将数据转换为稳定数据。
  • 将数据转换为稳定数据后,拟合一个时间序列模型来模拟数据之间的关系。
  • 为每个观察值找到平方残差误差,并为这些平方误差找到一个阈值。
  • 任何观察值的平方误差超过阈值都可以被标记为异常。

这种方法的原因是,如前所述,时间序列数据是严格有序的并且包含自相关性。通过上述方法,模型将找到数据的一般行为。正常数据的预测误差将比异常数据的预测误差小得多。

单变量时间序列异常检测与多变量时间序列异常检测:单变量时间序列数据只包含一列和与之相关的时间戳。使用AR(自回归)、MA(移动平均)、ARMA(自回归移动平均)和ARIMA(自回归积分移动平均)等算法来模拟与数据的关系。多变量时间序列数据包含多于一列和与之相关的时间戳。使用VAR(向量自回归)、VMA(向量移动平均)、VARMA(向量自回归移动平均)、VARIMA(向量自回归积分移动平均)和VECM(向量误差修正模型)等算法。

将使用Kaggle上的占用率数据来进行多变量时间序列异常检测。数据包含日期、温度、湿度、光照、二氧化碳、湿度比率和占用率等列。

def test_stationarity(ts_data, column='', signif=0.05, series=False): if series: adf_test = adfuller(ts_data, autolag='AIC') else: adf_test = adfuller(ts_data[column], autolag='AIC') p_value = adf_test[1] if p_value <= signif: test_result = "Stationary" else: test_result = "Non-Stationary" return test_result import pandas as pd occ_data = pd.read_csv('datatest.txt') occ_data = occ_data.set_index('date') print(occ_data.head())

首先,将使用ADF(增强迪基-富勒)检验来检查数据的每一列是否稳定。ADF检验提供了一个p值,可以用它来确定数据是否稳定。如果p值小于显著性水平,则数据是稳定的,否则数据是非稳定的。

adf_test_results = { col: test_stationarity(occ_data, col) for col in occ_data.columns } adf_test_results # 输出 # {'Temperature': 'Non-Stationary', # 'Humidity': 'Non-Stationary', # 'Light': 'Non-Stationary', # 'CO2': 'Non-Stationary', # 'HumidityRatio': 'Non-Stationary', # 'Occupancy': 'Non-Stationary'}

对数据的每一列都进行了ADF检验。测试结果表明,数据的所有列都是非稳定的。因此,需要将非稳定数据转换为稳定数据。将非稳定数据转换为稳定数据有多种方法,如差分、对数变换和季节性分解。在这篇文章中,将使用差分来将数据转换为稳定数据。

def differencing(data, column, order): differenced_data = data[column].diff(order) differenced_data.fillna(differenced_data.mean(), inplace=True) return differenced_data for col in occ_data.columns: occ_data[col] = differencing(occ_data, col, 1)

上述代码对每一列执行了一阶差分操作。现在,已经对数据进行了一阶差分。让检查数据是否已成为稳定数据。

adf_test_results = { col: test_stationarity(occ_data, col) for col in occ_data.columns } adf_test_results # 输出 # {'Temperature': 'Stationary', # 'Humidity': 'Stationary', # 'Light': 'Stationary', # 'CO2': 'Stationary', # 'HumidityRatio': 'Stationary', # 'Occupancy': 'Stationary'}

现在,数据的所有列都已成为稳定数据。如果差分操作没有将数据转换为稳定数据,可以尝试使用对数变换和季节性分解将数据转换为稳定数据。现在可以拟合一个时间序列模型来模拟数据之间的关系。这里将使用VAR(向量自回归)模型。

from statsmodels.tsa.api import VAR max_lag = 20 var_model = VAR(occ_data) # 选择最佳滞后阶数 lag_results = var_model.select_order(max_lag) selected_lag = lag_results.aic print(selected_lag) # selected_lag = 13

VAR的select_order方法用于找到数据的最佳滞后阶数。在例子中,最佳滞后阶数是13,它为模型提供了最小的AIC值。VAR模型使用数据的每一列的滞后作为特征,并将提供的数据列作为目标。VAR模型将拟合生成的特征,并使用每一列数据作为目标分别拟合最小二乘或线性回归。如前所述,使用这种方法的原因是数据中存在自相关性。如果直接使用线性回归来模拟这一点,最终会导致残差的自相关性,从而导致错误的预测。

def find_anomalies(squared_errors): threshold = np.mean(squared_errors) + np.std(squared_errors) predictions = (squared_errors >= threshold).astype(int) return predictions, threshold var = VAR(occ_data) var_fitresults = var.fit(selected_lag) squared_errors = var_fitresults.resid.sum(axis=1) ** 2 predictions, threshold = find_anomalies(squared_errors) # threshold = 7593.829254818655

现在,使用选定的滞后拟合VAR模型,并找到数据的平方误差。然后使用平方误差来找到阈值,超过该阈值的观察值被认为是异常值。

threshold = mean(squared_errors) + z * standard_deviation(squared_errors) data = occ_data.iloc[selected_lag:, :] data['Predictions'] = predictions.values data
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485