PhysioNet - Computing in Cardiology Challenge 2017

Wprowadzenie

15 stycznia 2017, ramach projektu PhysioNet, został zorganizowany konkurs, którego celem było stworzenie modelu klasyfikującego krótkie nagrania sygnału elektrokardiograficznego.

16 czerwca 2018 grupa MLGdańsk zorganizowała warsztato-hakaton, w trakcie którego uczestnicy mogli podjąć zadanie z konkursu PhysioNet.

  • Zestaw danych zawiera 8528 nagrań EKG o długości od 9 do 60s.
  • Wszystkie dane były próbkowane z częstotliwością 300Hz.
  • Nagrania EKG dzielą się na:
    • normalne (N, 5154),
    • z arytmią (A, 771),
    • z innym zaburzeniem (O, 2557),
    • zaszumione (~, 46).

Poniższy opis przedstawia próbę rozwiązania problemu rozróżniania pomiędzy sygnałami klasy N i A.

Rozwiązanie

Przygotowanie danych

W ramach przygotowania danych:

  • Ze zbioru wszystkich danych zostaną wyodrębnienie pliki klas N i A.
  • Ze zbiorów plików klas N i A zostanie losowo wybranych po 500 plików.

Przyjmijmy, że:

  • Katalog z danymi do analizy nazywa się ecg.

  • Plik annotations.csv zawiera informacje o klasach danych w plikach. Format pliku:

    ref annotation
    A00001 N
    A00002 N
    A00003 N
    A00004 A
    ... ...
import pandas as pd
import os
import re
import random
from shutil import copyfile

def extract_data(df):
    os.makedirs("A", exist_ok=True)
    os.makedirs("N", exist_ok=True)
    for f in os.listdir("ecg"):
        pattern = r".*?(\d+).csv"
        m = re.match(pattern=pattern, string=f)
        index = int(m.groups()[0])-1
        annotation = df["annotation"][index]
        src = os.path.join("ecg", f)
        dst = os.path.join(annotation, f)
        if annotation in "AN":
            copyfile(src, dst)

def select_random(src_dir, sample_size):
    dst_dir = "{}_{}".format(src_dir, sample_size)
    os.makedirs(dst_dir, exist_ok=True)
    files = os.listdir(src_dir)
    random_files = random.sample(files, sample_size)
    for f in random_files:
        src = os.path.join(src_dir, f)
        dst = os.path.join(dst_dir, f)
        copyfile(src, dst)

df = pd.read_csv("annotations.csv")
extract_data(df)
select_random(src_dir="A", sample_size=500)
select_random(src_dir="N", sample_size=500)

Opcjonalnie można wygenerować wykresy dla wybranych plików. Na każdym wykresie zostały zaznaczone miejsca, w których został wykryty, charakterystycznych dla sygnału EKG, pik “R”.

../../../../_images/ecg_peak_names.gif

Nazwy pików w syngale EKG.

def generate_plots(src_dir, sampling_rate):
    dst_dir = "{}_F".format(src_dir)
    os.makedirs(dst_dir, exist_ok=True)

    files = os.listdir(src_dir)
    for f in files:
        f_name, _ = os.path.splitext(f)
        src = os.path.join(src_dir, f)
        dst = os.path.join(dst_dir, "{}.png".format(f_name))

        df = pd.read_csv(src)
        m = df["signal"].values
        r_peaks = ecg.christov_segmenter(signal=m, sampling_rate=sampling_rate)
        ax = df.plot()
        for r_peak in r_peaks[0]:
            ax.axvline(x=r_peak, color='r', linestyle='--')
        plt.savefig(dst)
        plt.close()

generate_plots(src_dir="A_500", sampling_rate=300)
generate_plots(src_dir="N_500", sampling_rate=300)
../../../../_images/signal.png

Przykładowy wykres przedstawiający normalny rytm EKG.

Wydobycie cech

Cechą, która zostanie wydobyta z sygnału EKG jest odchylenie standardowe długości odcinków pomiędzy pikami “R”.

from pathlib import Path
import biosppy
import numpy as np
import pandas as pd

BASE_DIR = Path(".")
LABELS = ["N", "A"]

labels = []
std_devs = []
for label in LABELS:
    dir_name = "{}_500".format(label)
    dir_path = BASE_DIR / dir_name
    for file_ in dir_path.iterdir():
        ecg_csv = pd.read_csv(file_)
        r_peaks = biosppy.signals.ecg.christov_segmenter(
            signal=ecg_csv.signal.values,
            sampling_rate=300)[0]
        diff = pd.Series(r_peaks).diff().dropna()
        std_dev = np.std(diff)
        std_devs.append(std_dev)
        labels.append(label)
df = pd.DataFrame(data={"label": labels, "std_dev": std_devs})
df.to_csv("data_set.csv", index=False)

Klasyfikacja

Plik data_set.csv zawiera kolumny:

  • std_dev - Odchylenie standardowe liczby próbek przypadającej na segmenty sygnału pomiędzy pikami “R”.
  • label - Klasa sygnału.
from sklearn.svm import SVC
from sklearn.model_selection import RepeatedKFold

import pandas as pd


df = pd.read_csv("data_set.csv")
df.dropna(inplace=True)

X = df[["std_dev"]].values
y = df["label"].values

rkf = RepeatedKFold(n_splits=10, n_repeats=10)
scores = []
for train_index, test_index in rkf.split(X):
      X_train, X_test = X[train_index], X[test_index]
      y_train, y_test = y[train_index], y[test_index]

      clf = SVC()
      clf.fit(X_train, y_train)
      score = clf.score(X_test, y_test)

      scores.append(score)

print(pd.Series(data=scores).describe())

Uzyskaliśmy średnią skuteczność klasyfikacji na poziomie 79,41% (± 0,03%).

count    100.000000
mean       0.794180
std        0.035272
min        0.687500
25%        0.770833
50%        0.793814
75%        0.812983
max        0.885417