深度学习在医疗诊断中的应用

新冠肺炎(COVID-19)是由SARS-CoV-2病毒引起的,该病毒最早于2019年12月在中国武汉被发现并识别。尽管武汉及其周边地区实施了封锁,但病毒仍然迅速传播并发生变异,导致了现代世界中最严重的人文危机之一,影响了全球数百万人。

病毒的快速传播和变异导致了多波疫情,主要影响第三世界和发展中国家。随着世界各国政府努力控制病毒传播,受影响的人数持续上升。

印度新冠肺炎的故事:探索性数据分析和预测

在本文中,将使用CoronaHack-Chest X-Ray数据集,该数据集包含胸部X光图像,需要找出那些受到冠状病毒影响的图像。

SARS-CoV-2是一种主要影响呼吸系统的病毒,因此胸部X光是识别受影响肺部的重要成像方法之一。以下是新冠肺炎肺炎与细菌性和病毒性肺炎的对比:

新冠肺炎肺炎检测和分类在胸部X光图像中的迁移学习

如所见,新冠肺炎肺炎可以覆盖整个肺部,比细菌性和病毒性肺炎更为危险。强烈建议阅读在上面链接中提到的论文《新冠肺炎肺炎检测和分类在胸部X光图像中的迁移学习》。

在本文中,将使用深度学习和迁移学习来分类和识别受新冠肺炎影响的肺部X光图像。

导入库和加载数据

import matplotlib.pyplot as plt import seaborn as sns %matplotlib inline import numpy as np import pandas as pd sns.set() import tensorflow as tf from tensorflow import keras from tensorflow.keras.models import Sequential from tensorflow.keras.layers import * from tensorflow.keras.optimizers import Adam, SGD, RMSprop from tensorflow.keras.applications import DenseNet121, VGG19, ResNet50 import PIL.Image import matplotlib.pyplot as mpimg import os from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array from tensorflow.keras.preprocessing import image from tqdm import tqdm import warnings warnings.filterwarnings("ignore") from sklearn.utils import shuffle

将使用Pandas库读取数据集的元数据,并查看数据集的形状和前几行数据。

train_df = pd.read_csv('../input/coronahack-chest-xraydataset/Chest_xray_Corona_Metadata.csv') train_df.shape train_df.head(5) train_df.info()

处理缺失值

检查数据集中的缺失值,并决定用‘unknown’填充这些缺失值。

missing_vals = train_df.isnull().sum() missing_vals.plot(kind='bar') train_df.dropna(how='all') train_df.isnull().sum() train_df.fillna('unknown', inplace=True) train_df.isnull().sum()

将数据集分为训练集和测试集,并确保两者的行数之和等于原始数据集的行数。

train_data = train_df[train_df['Dataset_type'] == 'TRAIN'] test_data = train_df[train_df['Dataset_type'] == 'TEST'] assert train_data.shape[0] + test_data.shape[0] == train_df.shape[0] print(f"Shape of train data : {train_data.shape}") print(f"Shape of test data : {test_data.shape}")

显示图像

将显示训练集和测试集中的一些样本图像。

test_img_dir = '/kaggle/input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/test' train_img_dir = '/kaggle/input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train' sample_train_images = list(os.walk(train_img_dir))[0][2][:8] sample_train_images = list(map(lambda x: os.path.join(train_img_dir, x), sample_train_images)) sample_test_images = list(os.walk(test_img_dir))[0][2][:8] sample_test_images = list(map(lambda x: os.path.join(test_img_dir, x), sample_test_images)) plt.figure(figsize=(10,10)) for iterator, filename in enumerate(sample_train_images): image = PIL.Image.open(filename) plt.subplot(4,2,iterator+1) plt.imshow(image, cmap=plt.cm.bone) plt.tight_layout()

可视化

将使用Seaborn库来可视化数据集中的标签分布。

plt.figure(figsize=(15,10)) sns.countplot(train_data['Label_2_Virus_category']);

对于新冠肺炎病例,将显示图像和直方图。

fig, ax = plt.subplots(4, 2, figsize=(15, 10)) covid_path = train_data[train_data['Label_2_Virus_category']=='COVID-19']['X_ray_image_name'].values sample_covid_path = covid_path[:4] sample_covid_path = list(map(lambda x: os.path.join(train_img_dir, x), sample_covid_path)) for row, file in enumerate(sample_covid_path): image = plt.imread(file) ax[row, 0].imshow(image, cmap=plt.cm.bone) ax[row, 1].hist(image.ravel(), 256, [0,256]) ax[row, 0].axis('off') if row == 0: ax[row, 0].set_title('Images') ax[row, 1].set_title('Histograms') fig.suptitle('Label 2 Virus Category = COVID-19', size=16) plt.show()

对于正常病例,也将显示图像和直方图。

fig, ax = plt.subplots(4, 2, figsize=(15, 10)) normal_path = train_data[train_data['Label']=='Normal']['X_ray_image_name'].values sample_normal_path = normal_path[:4] sample_normal_path = list(map(lambda x: os.path.join(train_img_dir, x), sample_normal_path)) for row, file in enumerate(sample_normal_path): image = plt.imread(file) ax[row, 0].imshow(image, cmap=plt.cm.bone) ax[row, 1].hist(image.ravel(), 256, [0,256]) ax[row, 0].axis('off') if row == 0: ax[row, 0].set_title('Images') ax[row, 1].set_title('Histograms') fig.suptitle('Label = NORMAL', size=16) plt.show()

数据增强

将使用ImageDataGenerator来增强数据集。

datagen = ImageDataGenerator(shear_range=0.2, zoom_range=0.2) def read_img(filename, size, path): img = image.load_img(os.path.join(path, filename), target_size=size) img = image.img_to_array(img) / 255 return img samp_img = read_img(final_train_data['X_ray_image_name'][0], (255,255), train_img_path) plt.figure(figsize=(10,10)) plt.suptitle('Data Augmentation', fontsize=28) i = 0 for batch in datagen.flow(tf.expand_dims(samp_img,0), batch_size=6): plt.subplot(3, 3, i+1) plt.grid(False) plt.imshow(batch.reshape(255, 255, 3)); if i == 8: break i += 1 plt.show();

转换为张量

将所有数据转换为张量,以便在TensorFlow中使用。

train_arrays = [] final_train_data['X_ray_image_name'].apply(lambda x: train_arrays.append(read_img(x, (255,255), train_img_dir))) test_arrays = [] final_test_data['X_ray_image_name'].apply(lambda x: test_arrays.append(read_img(x, (255,255), test_img_dir))) print(len(train_arrays)) print(len(test_arrays)) y_train = np.concatenate((np.int64(final_train_data['target'].values), np.ones(len(with_corona_augmented), dtype=np.int64))) train_tensors = tf.convert_to_tensor(np.concatenate((np.array(train_arrays), np.array(with_corona_augmented)))) test_tensors = tf.convert_to_tensor(np.array(test_arrays)) y_train_tensor = tf.convert_to_tensor(y_train) y_test_tensor = tf.convert_to_tensor(final_test_data['target'].values) train_dataset = tf.data.Dataset.from_tensor_slices((train_tensors, y_train_tensor)) test_dataset = tf.data.Dataset.from_tensor_slices((test_tensors, y_test_tensor))

生成批次

将数据集划分为批次,以便在训练和测试中使用。

BATCH_SIZE = 16 BUFFER = 1000 train_batches = train_dataset.shuffle(BUFFER).batch(BATCH_SIZE) test_batches = test_dataset.batch(BATCH_SIZE) for i,l in train_batches.take(1): print('Train Shape per Batch: ',i.shape); for i,l in test_batches.take(1): print('Test Shape per Batch: ',i.shape);

迁移学习与ResNet50

INPUT_SHAPE = (255,255,3) base_model = tf.keras.applications.ResNet50(input_shape= INPUT_SHAPE, include_top=False, weights='imagenet') base_model.trainable = False model = Sequential() model.add(base_model) model.add(Layers.GlobalAveragePooling2D()) model.add(Layers.Dense(128)) model.add(Layers.Dropout(0.2)) model.add(Layers.Dense(1, activation = 'sigmoid')) model.summary() callbacks = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=2) model.compile(optimizer='adam', loss = 'binary_crossentropy', metrics=['accuracy']) model.fit(train_batches, epochs=10, validation_data=test_batches, callbacks=[callbacks]) pred = model.predict_classes(np.array(test_arrays)) from sklearn.metrics import classification_report, confusion_matrix print(classification_report(test_data['target'], pred.flatten())) con_mat = confusion_matrix(test_data['target'], pred.flatten()) plt.figure(figsize = (10,10)) plt.title('CONFUSION MATRIX') sns.heatmap(con_mat, cmap='cividis', yticklabels=['Negative', 'Positive'], xticklabels=['Negative', 'Positive'], annot=True);
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485