:GradCAM 기술을 사용하여 병리학적 상태를 예측하기 위해, 이미지의 중요한 영역을 강조하는 히트맵을 생성
: 먼저 작은 Training 셋을 로드하고, 앞서 AUC 측정값이 가장 높은 4개의 클래스를 살펴볼 것
df = pd.read_csv("drive/MyDrive/nih/train-small.csv")
IMAGE_DIR = "drive/MyDrive/nih/images-small/"
# only show the lables with top 4 AUC
labels_to_show = np.take(labels, np.argsort(auc_rocs)[::-1])[:4]
# Expected output
Loading original image
Generating gradcam for class Cardiomegaly
Generating gradcam for class Mass
Generating gradcam for class Pneumothorax
Generating gradcam for class Edema
plt.xticks(rotation=90)
plt.bar(x=labels, height=np.mean(train_generator.labels, axis=0))
plt.title("Frequency of Each Class")
plt.show()
: 위 그림에서 여러 병리에 따라 양성 사례의 유병률이크게 다르다는 것을 알 수 있음
→이러한 경향은 전체 데이터셋의 경향도 반영함
: 이상적으로는 균형 잡힌 데이터셋으로 모델을 훈련시켜, positive 및 negative 사례가 손실에 동등하게 기여하도록 함
Exercise 2 - Computing Class Frequencies
: 아래 함수를 통해, 각 데이터셋의 라벨에 대한 빈도를 계산
def compute_class_freqs(labels):
"""
Compute positive and negative frequences for each class.
Args:
labels (np.array): matrix of labels, size (num_examples, num_classes)
Returns:
positive_frequencies (np.array): array of positive frequences for each
class, size (num_classes)
negative_frequencies (np.array): array of negative frequences for each
class, size (num_classes)
"""
# total number of patients (rows)
N = labels.shape[0]
positive_frequencies = np.sum(labels, axis=0) / labels.shape[0]
negative_frequencies = 1 - positive_frequencies
return positive_frequencies, negative_frequencies
data = pd.DataFrame({"Class": labels, "Label": "Positive", "Value": pos_contribution})
data = data.append([{"Class": labels[l], "Label": "Negative", "Value": v}
for l,v in enumerate(neg_contribution)], ignore_index=True)
plt.xticks(rotation=90)
sns.barplot(x="Class", y="Value", hue="Label" ,data=data);
: 위 그림에서처럼 가중치를 적용되어, 각 클래스의 양수/음수 레이블이 손실 함수에 대해 동일한 기여도를 갖게됨
Exercise 3 - Weighted Loss
:아래 weighted_loss 함수를 작성하여 각 배치의 가중 손실을 계산하는 손실 함수를 반환
: 다중 클래스 손실의 경우 각 개별 클래스의 평균 손실을 합산
def get_weighted_loss(pos_weights, neg_weights, epsilon=1e-7):
"""
Return weighted loss function given negative weights and positive weights.
Args:
pos_weights (np.array): array of positive weights for each class, size (num_classes)
neg_weights (np.array): array of negative weights for each class, size (num_classes)
Returns:
weighted_loss (function): weighted loss function
"""
def weighted_loss(y_true, y_pred):
"""
Return weighted loss value.
Args:
y_true (Tensor): Tensor of true labels, size is (num_examples, num_classes)
y_pred (Tensor): Tensor of predicted labels, size is (num_examples, num_classes)
Returns:
loss (Tensor): overall scalar loss summed across all classes
"""
# initialize loss to zero
loss = 0.0
for i in range(len(pos_weights)):
# for each class, add average weighted loss for that class
loss += -(K.mean( pos_weights[i] * y_true[:,i] * K.log(y_pred[:,i] + epsilon) + \
neg_weights[i] * (1 - y_true[:,i]) * K.log(1 - y_pred[:,i] + epsilon), axis = 0))
return loss
return weighted_loss
: 예제 테스트
sess = K.get_session()
with sess.as_default() as sess:
print("Test example:\n")
y_true = K.constant(np.array(
[[1, 1, 1],
[1, 1, 0],
[0, 1, 0],
[1, 0, 1]]
))
print("y_true:\n")
print(y_true.eval())
w_p = np.array([0.25, 0.25, 0.5])
w_n = np.array([0.75, 0.75, 0.5])
print("\nw_p:\n")
print(w_p)
print("\nw_n:\n")
print(w_n)
y_pred_1 = K.constant(0.7*np.ones(y_true.shape))
print("\ny_pred_1:\n")
print(y_pred_1.eval())
y_pred_2 = K.constant(0.3*np.ones(y_true.shape))
print("\ny_pred_2:\n")
print(y_pred_2.eval())
# test with a large epsilon in order to catch errors
L = get_weighted_loss(w_p, w_n, epsilon=1)
print("\nIf we weighted them correctly, we expect the two losses to be the same.")
L1 = L(y_true, y_pred_1).eval()
L2 = L(y_true, y_pred_2).eval()
print(f"\nL(y_pred_1)= {L1:.4f}, L(y_pred_2)= {L2:.4f}")
print(f"Difference is L1 - L2 = {L1 - L2:.4f}")
# Expected output
Test example:
y_true:
[[1. 1. 1.]
[1. 1. 0.]
[0. 1. 0.]
[1. 0. 1.]]
w_p:
[0.25 0.25 0.5 ]
w_n:
[0.75 0.75 0.5 ]
y_pred_1:
[[0.7 0.7 0.7]
[0.7 0.7 0.7]
[0.7 0.7 0.7]
[0.7 0.7 0.7]]
y_pred_2:
[[0.3 0.3 0.3]
[0.3 0.3 0.3]
[0.3 0.3 0.3]
[0.3 0.3 0.3]]
If we weighted them correctly, we expect the two losses to be the same.
L(y_pred_1)= -0.4956, L(y_pred_2)= -0.4956
Difference is L1 - L2 = 0.0000
3.3 DenseNet121
:다음으로 Keras에서 로드한 후, 두 개의 레이어를 추가 할 수 있는 DenseNet121 모델을 사용
*GlobalAveragePooling2D - DenseNet121에서 마지막 convolution 레이어의 평균을 얻음
*Dense - 각 클래스에 대한 예측 logit을 얻기 위한 sigmoid 기능이 있는 layer
: compile 함수에서 loss 매개 변수를 지정하여, 모델에 대한 사용자 정의 손실 함수를 설정할 수 있음
# create the base pre-trained model
base_model = DenseNet121(weights='drive/MyDrive/nih/densenet.hdf5', include_top=False)
x = base_model.output
# add a global spatial average pooling layer
x = GlobalAveragePooling2D()(x)
# and a logistic layer
predictions = Dense(len(labels), activation="sigmoid")(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(optimizer='adam', loss=get_weighted_loss(pos_weights, neg_weights))
: 예를 들어, 환자가 병원 방문 중 여러 다른 시간에 X-ray 이미지를 촬영한 경우가 이에 해당
: Train, Validation,, Test 데이터셋 사이에 Leakage가 없도록, 데이터 분할이 수행됨
Exercise 1 - Checking Data Leakage : 아래 셀에서 두 데이터셋 사이에 누출이 있는지 확인하는 함수가 작성됨
def check_for_leakage(df1, df2, patient_col):
"""
Return True if there any patients are in both df1 and df2.
Args:
df1 (dataframe): dataframe describing first dataset
df2 (dataframe): dataframe describing second dataset
patient_col (str): string name of column with patient IDs
Returns:
leakage (bool): True if there is leakage, otherwise False
"""
df1_patients_unique = set(df1[patient_col].unique().tolist())
df2_patients_unique = set(df2[patient_col].unique().tolist())
patients_in_both_groups = df1_patients_unique.intersection(df2_patients_unique)
# leakage contains true if there is patient overlap, otherwise false.
leakage = len(patients_in_both_groups) >= 1 # boolean (true if there is at least 1 patient in both groups)
return leakage
: 다음 셀을 실행하여 train, valid, test 데이터셋에 공통된 환자가 있는지 확인
print("leakage between train and test: {}".format(check_for_leakage(train_df, test_df, 'PatientId')))
print("leakage between valid and test: {}".format(check_for_leakage(valid_df, test_df, 'PatientId')))
# Expected output
leakage between train and test: False
leakage between valid and test: False
: 두 가지 모두에 대해 'False'가 출력되면, training을 위한 데이터셋 준비가 완료
2.2 Preparing Images
:데이터셋 분할이 준비됐으므로, 이를 사용하도록 모델 설정을 진행할 수 있음
: 이를 위해, Keras의 ImageDataGenerator를 사용하여 데이터프레임에 지정된 이미지에 대한 'generator'를 빌드
: generator로 각 배치의 값을 변환하여 평균이 0, 표준 편차가 1이 되도록 표준화 함
: 또한, 단일채널 X-ray 이미지 (회색조)를 3채널 형식으로 변환
: 이미지 크기를 320x320 픽셀로 설정
def get_train_generator(df, image_dir, x_col, y_cols, shuffle=True, batch_size=8, seed=1, target_w = 320, target_h = 320):
"""
Return generator for training set, normalizing using batch
statistics.
Args:
train_df (dataframe): dataframe specifying training data.
image_dir (str): directory where image files are held.
x_col (str): name of column in df that holds filenames.
y_cols (list): list of strings that hold y labels for images.
sample_size (int): size of sample to use for normalization statistics.
batch_size (int): images per batch to be fed into model during training.
seed (int): random seed.
target_w (int): final width of input images.
target_h (int): final height of input images.
Returns:
train_generator (DataFrameIterator): iterator over training set
"""
print("getting train generator...")
# normalize images
image_generator = ImageDataGenerator(
samplewise_center=True,
samplewise_std_normalization= True)
# flow from directory with specified batch size
# and target image size
generator = image_generator.flow_from_dataframe(
dataframe=df,
directory=image_dir,
x_col=x_col,
y_col=y_cols,
class_mode="raw",
batch_size=batch_size,
shuffle=shuffle,
seed=seed,
target_size=(target_w,target_h))
return generator
Valid 및 Test 셋을 위한 별도의 생성기 구축
: 이제 데이터 validaiton 및 testing을 위한 새로운 생성기를 구축해야함
: Training 데이터와 동일한 generator를 사용할 수 없는 이유
-Training에서는 배치마다 각 이미지를 정규화하므로 배치 통계를 사용함
-실제 분석에서는 들어오는 이미지를 한 번에 일괄 처리하지 않음 (한번에 하나의 이미지를 처리)
-Training 모델에는 test 데이터셋에 대한 정보가 없어야 함
: 우리가 해야 할 일은 training 셋에서 계산된 통계를 사용하여, 들어오는 test 셋을 정규화하는 것
-아래 함수에서 이를 구현함
-이상적으로는 전체 training 셋을 사용하여 표본 평균과 표준 편차를 계산하고자 할 것
*그러나 이것은 매우 크기 때문에 시간이 많이 소요
-시간을 고려하여 데이터셋의 무작위 샘플을 취하고, 샘플 평균과 샘플 표준 편차를 계산함
def get_test_and_valid_generator(valid_df, test_df, train_df, image_dir, x_col, y_cols, sample_size=100, batch_size=8, seed=1, target_w = 320, target_h = 320):
"""
Return generator for validation set and test test set using
normalization statistics from training set.
Args:
valid_df (dataframe): dataframe specifying validation data.
test_df (dataframe): dataframe specifying test data.
train_df (dataframe): dataframe specifying training data.
image_dir (str): directory where image files are held.
x_col (str): name of column in df that holds filenames.
y_cols (list): list of strings that hold y labels for images.
sample_size (int): size of sample to use for normalization statistics.
batch_size (int): images per batch to be fed into model during training.
seed (int): random seed.
target_w (int): final width of input images.
target_h (int): final height of input images.
Returns:
test_generator (DataFrameIterator) and valid_generator: iterators over test set and validation set respectively
"""
print("getting train and valid generators...")
# get generator to sample dataset
raw_train_generator = ImageDataGenerator().flow_from_dataframe(
dataframe=train_df,
directory=IMAGE_DIR,
x_col="Image",
y_col=labels,
class_mode="raw",
batch_size=sample_size,
shuffle=True,
target_size=(target_w, target_h))
# get data sample
batch = raw_train_generator.next()
data_sample = batch[0]
# use sample to fit mean and std for test set generator
image_generator = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization= True)
# fit generator to sample from training data
image_generator.fit(data_sample)
# get test generator
valid_generator = image_generator.flow_from_dataframe(
dataframe=valid_df,
directory=image_dir,
x_col=x_col,
y_col=y_cols,
class_mode="raw",
batch_size=batch_size,
shuffle=False,
seed=seed,
target_size=(target_w,target_h))
test_generator = image_generator.flow_from_dataframe(
dataframe=test_df,
directory=image_dir,
x_col=x_col,
y_col=y_cols,
class_mode="raw",
batch_size=batch_size,
shuffle=False,
seed=seed,
target_size=(target_w,target_h))
return valid_generator, test_generator
: generator 함수가 준비 되었으면 training, validation, testing 데이터셋을 각각 하나씩 생성해봄
# Expected output
getting train generator...
Found 1000 validated image filenames.
getting train and valid generators...
Found 1000 validated image filenames.
Found 200 validated image filenames.
Found 420 validated image filenames.
: '__get_item __ (index)' 함수를 호출하여, generator가 모델에 제공하는 내용을 살펴봄
x, y = train_generator.__getitem__(0)
plt.imshow(x[0]);
# Expected output
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Chest X-Ray Medical Diagnosis with Deep Learning 시작
: Keras를 사용하여 흉부 X-ray classifier 딥러닝 모델을 생성하여, 의료 영상 진단에 사용해보는 학습 과정
: 모든 분석 과정은 google colab에서 수행되었음
: 실제 X-ray 데이터 전처리부터 모델 평가까지 아래의 과정을 수행
Data preparation
Visualizing data
Preventing data leakage
Model Development
Addressing class imbalance
Leveraging pre-trained models using transfer learning
Evaluation
AUC and ROC curves
① Import Packages and Function
: numpy, pandas - 데이터 전처리, 가공
: matplotlib.pyplot, seaborn - 결과 플롯팅으로 시각화
: util - 로컬로 정의된 유틸리티 기능 제공
: keras - 딥러닝 모델 구축
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.densenet import DenseNet121
from keras.layers import Dense, GlobalAveragePooling2D
from keras.models import Model
from tensorflow.compat.v1.keras import backend as K
from keras.models import load_model
# 아래 코드를 실행하여 util.py을 업로드
from google.colab import files
src = list(files.upload().values())[0]
open('util.py','wb').write(src)
import util
*ROC(Receiver Operating Characteristic) = 모든 임계값에서 분류 모델의 성능을 보여주는 그래프
*AUC(Area Under the Curve) = ROC 곡선 아래 영역
: AUC가 높다는 사실은 클래스를 구별하는 모델의 성능이 훌륭하다는 것을 의미
: 임상에서 AUC-ROC 곡선은 정상인 및 환자 클래스를 구분하는 모델(ex. 특정 유전자군)의 성능 평가로 흔하게 사용됨
: ROC 곡선은 TPR(=민감도)이 y축에 있고, FPR(=1-specificity)이 x축으로 그려짐
2. AUC - ROC Curve 용어 정리
: Confusion Matrix의 분류 모델 성능에 대한 지표 (TP, FP, TN, FN)
: 사용되는 통계량
ㄱ) Sensitivity(민감도) = 실제로 질병이 있을 때, 검사 결과가 양성인 경우 (true positive rate) = a / (a+b)
ㄴ) Specificity(특이도) = 실제로 질병이 없을 때, 검사 결과가 음성인 경우 (false positive rate) = d / (c+d)
ㄷ) Positive likelihood ratio(양성우도비) = 질병이 있을 때 검사 결과가 양성 확률과 질병이 없을 때 검사 결과가 양성 확률 사이의 비율 = True positive rate / False positive rate = Sensitivity / (1-Specificity)
ㄹ) Negative likelihood ratio(음성우도비) = 질병이 있을 때 검사 결과가 음성 확률과 질병이 없을 때 검사 결과가 음성 확률 사이의 비율 = False negative rate / True negative rate = (1-Sensitivity) / Specificity
ㅁ) Positive predictive value(양성예측도) =검사 결과가 양성일 때, 질병이 실제로 존재할 확률 = a / (a+c)
ㅅ) Negative predictive value(음성예측도) = 검사 결과가 음성일 때, 질병이 실제로 없을 확률 = d / (b+d)
3. 모델의 성능을 측정하는 방법
: 우수한 분류 모델은 AUC 값이 1에 가깝고, 클래스를 분류하는 성능이 뛰어남을 의미
: 반대로, 불량한 분류 모델은 AUC 값이 0에 가깝고, 클래스를 분류하는 성능이 떨어짐을 의미
*실제로는 AUC의 최소값은 0.5으로, 이 경우에 모델의 클래스 분리 능력이 전혀 없음을 뜻함