新冠肺炎(COVID-19)是由SARS-CoV-2病毒引起的,该病毒最早于2019年12月在中国武汉被发现并识别。尽管武汉及其周边地区实施了封锁,但病毒仍然迅速传播并发生变异,导致了现代世界中最严重的人文危机之一,影响了全球数百万人。
病毒的快速传播和变异导致了多波疫情,主要影响第三世界和发展中国家。随着世界各国政府努力控制病毒传播,受影响的人数持续上升。
在本文中,将使用CoronaHack-Chest X-Ray数据集,该数据集包含胸部X光图像,需要找出那些受到冠状病毒影响的图像。
SARS-CoV-2是一种主要影响呼吸系统的病毒,因此胸部X光是识别受影响肺部的重要成像方法之一。以下是新冠肺炎肺炎与细菌性和病毒性肺炎的对比:
如所见,新冠肺炎肺炎可以覆盖整个肺部,比细菌性和病毒性肺炎更为危险。强烈建议阅读在上面链接中提到的论文《新冠肺炎肺炎检测和分类在胸部X光图像中的迁移学习》。
在本文中,将使用深度学习和迁移学习来分类和识别受新冠肺炎影响的肺部X光图像。
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
import numpy as np
import pandas as pd
sns.set()
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from tensorflow.keras.applications import DenseNet121, VGG19, ResNet50
import PIL.Image
import matplotlib.pyplot as mpimg
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array
from tensorflow.keras.preprocessing import image
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
from sklearn.utils import shuffle
将使用Pandas库读取数据集的元数据,并查看数据集的形状和前几行数据。
train_df = pd.read_csv('../input/coronahack-chest-xraydataset/Chest_xray_Corona_Metadata.csv')
train_df.shape
train_df.head(5)
train_df.info()
检查数据集中的缺失值,并决定用‘unknown’填充这些缺失值。
missing_vals = train_df.isnull().sum()
missing_vals.plot(kind='bar')
train_df.dropna(how='all')
train_df.isnull().sum()
train_df.fillna('unknown', inplace=True)
train_df.isnull().sum()
将数据集分为训练集和测试集,并确保两者的行数之和等于原始数据集的行数。
train_data = train_df[train_df['Dataset_type'] == 'TRAIN']
test_data = train_df[train_df['Dataset_type'] == 'TEST']
assert train_data.shape[0] + test_data.shape[0] == train_df.shape[0]
print(f"Shape of train data : {train_data.shape}")
print(f"Shape of test data : {test_data.shape}")
将显示训练集和测试集中的一些样本图像。
test_img_dir = '/kaggle/input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/test'
train_img_dir = '/kaggle/input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train'
sample_train_images = list(os.walk(train_img_dir))[0][2][:8]
sample_train_images = list(map(lambda x: os.path.join(train_img_dir, x), sample_train_images))
sample_test_images = list(os.walk(test_img_dir))[0][2][:8]
sample_test_images = list(map(lambda x: os.path.join(test_img_dir, x), sample_test_images))
plt.figure(figsize=(10,10))
for iterator, filename in enumerate(sample_train_images):
image = PIL.Image.open(filename)
plt.subplot(4,2,iterator+1)
plt.imshow(image, cmap=plt.cm.bone)
plt.tight_layout()
将使用Seaborn库来可视化数据集中的标签分布。
plt.figure(figsize=(15,10))
sns.countplot(train_data['Label_2_Virus_category']);
对于新冠肺炎病例,将显示图像和直方图。
fig, ax = plt.subplots(4, 2, figsize=(15, 10))
covid_path = train_data[train_data['Label_2_Virus_category']=='COVID-19']['X_ray_image_name'].values
sample_covid_path = covid_path[:4]
sample_covid_path = list(map(lambda x: os.path.join(train_img_dir, x), sample_covid_path))
for row, file in enumerate(sample_covid_path):
image = plt.imread(file)
ax[row, 0].imshow(image, cmap=plt.cm.bone)
ax[row, 1].hist(image.ravel(), 256, [0,256])
ax[row, 0].axis('off')
if row == 0:
ax[row, 0].set_title('Images')
ax[row, 1].set_title('Histograms')
fig.suptitle('Label 2 Virus Category = COVID-19', size=16)
plt.show()
对于正常病例,也将显示图像和直方图。
fig, ax = plt.subplots(4, 2, figsize=(15, 10))
normal_path = train_data[train_data['Label']=='Normal']['X_ray_image_name'].values
sample_normal_path = normal_path[:4]
sample_normal_path = list(map(lambda x: os.path.join(train_img_dir, x), sample_normal_path))
for row, file in enumerate(sample_normal_path):
image = plt.imread(file)
ax[row, 0].imshow(image, cmap=plt.cm.bone)
ax[row, 1].hist(image.ravel(), 256, [0,256])
ax[row, 0].axis('off')
if row == 0:
ax[row, 0].set_title('Images')
ax[row, 1].set_title('Histograms')
fig.suptitle('Label = NORMAL', size=16)
plt.show()
将使用ImageDataGenerator来增强数据集。
datagen = ImageDataGenerator(shear_range=0.2, zoom_range=0.2)
def read_img(filename, size, path):
img = image.load_img(os.path.join(path, filename), target_size=size)
img = image.img_to_array(img) / 255
return img
samp_img = read_img(final_train_data['X_ray_image_name'][0], (255,255), train_img_path)
plt.figure(figsize=(10,10))
plt.suptitle('Data Augmentation', fontsize=28)
i = 0
for batch in datagen.flow(tf.expand_dims(samp_img,0), batch_size=6):
plt.subplot(3, 3, i+1)
plt.grid(False)
plt.imshow(batch.reshape(255, 255, 3));
if i == 8:
break
i += 1
plt.show();
将所有数据转换为张量,以便在TensorFlow中使用。
train_arrays = []
final_train_data['X_ray_image_name'].apply(lambda x: train_arrays.append(read_img(x, (255,255), train_img_dir)))
test_arrays = []
final_test_data['X_ray_image_name'].apply(lambda x: test_arrays.append(read_img(x, (255,255), test_img_dir)))
print(len(train_arrays))
print(len(test_arrays))
y_train = np.concatenate((np.int64(final_train_data['target'].values), np.ones(len(with_corona_augmented), dtype=np.int64)))
train_tensors = tf.convert_to_tensor(np.concatenate((np.array(train_arrays), np.array(with_corona_augmented))))
test_tensors = tf.convert_to_tensor(np.array(test_arrays))
y_train_tensor = tf.convert_to_tensor(y_train)
y_test_tensor = tf.convert_to_tensor(final_test_data['target'].values)
train_dataset = tf.data.Dataset.from_tensor_slices((train_tensors, y_train_tensor))
test_dataset = tf.data.Dataset.from_tensor_slices((test_tensors, y_test_tensor))
将数据集划分为批次,以便在训练和测试中使用。
BATCH_SIZE = 16
BUFFER = 1000
train_batches = train_dataset.shuffle(BUFFER).batch(BATCH_SIZE)
test_batches = test_dataset.batch(BATCH_SIZE)
for i,l in train_batches.take(1):
print('Train Shape per Batch: ',i.shape);
for i,l in test_batches.take(1):
print('Test Shape per Batch: ',i.shape);
INPUT_SHAPE = (255,255,3)
base_model = tf.keras.applications.ResNet50(input_shape= INPUT_SHAPE, include_top=False, weights='imagenet')
base_model.trainable = False
model = Sequential()
model.add(base_model)
model.add(Layers.GlobalAveragePooling2D())
model.add(Layers.Dense(128))
model.add(Layers.Dropout(0.2))
model.add(Layers.Dense(1, activation = 'sigmoid'))
model.summary()
callbacks = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=2)
model.compile(optimizer='adam', loss = 'binary_crossentropy', metrics=['accuracy'])
model.fit(train_batches, epochs=10, validation_data=test_batches, callbacks=[callbacks])
pred = model.predict_classes(np.array(test_arrays))
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(test_data['target'], pred.flatten()))
con_mat = confusion_matrix(test_data['target'], pred.flatten())
plt.figure(figsize = (10,10))
plt.title('CONFUSION MATRIX')
sns.heatmap(con_mat, cmap='cividis', yticklabels=['Negative', 'Positive'], xticklabels=['Negative', 'Positive'], annot=True);