#following examples
#taken from https://dtaidistance.readthedocs.io/en/latest/usage/dtw.html
#note: there are several other libraries for dtw that you can use instead
from dtaidistance import dtw
from dtaidistance import dtw_visualisation as dtwvis
import numpy as np
import matplotlib.pyplot as plt
s1 = np.array([0., 0, 1, 2, 1, 0, 1, 0, 0, 2, 1, 0, 0])
s2 = np.array([0., 1, 2, 3, 1, 0, 0, 0, 2, 1, 0, 0, 0])
path = dtw.warping_path(s1, s2)
dtwvis.plot_warping(s1, s2, path)
distance = dtw.distance(s1, s2)
print("DTW distance=",distance)
DTW distance= 1.4142135623730951
d, paths = dtw.warping_paths(s1, s2)
best_path = dtw.best_path(paths)
dtwvis.plot_warpingpaths(s1, s2, paths, best_path)
(<Figure size 720x720 with 4 Axes>, [<AxesSubplot:>, <AxesSubplot:>, <AxesSubplot:>, <AxesSubplot:>])
time = np.linspace(0,20,100)
s1 = 5*np.sin(time)
s2 = 2*np.sin(time + 1)
path = dtw.warping_path(s1, s2)
dtwvis.plot_warping(s1, s2, path)
distance = dtw.distance(s1, s2)
print("DTW distance=",distance)
DTW distance= 18.594164072887992
#UCI Human Activity Recognition dataset
#available at https://archive.ics.uci.edu/ml/datasets/Human+Activity+Recognition+Using+Smartphones
#extract data from UCI_HAR in a local folder so that the following statements work
#following code to parse data was modified from https://nbviewer.jupyter.org/github/markdregan/K-Nearest-Neighbors-with-Dynamic-Time-Warping/blob/master/K_Nearest_Neighbor_Dynamic_Time_Warping.ipynb
x_train_file = open('./UCI_HAR_Dataset/train/X_train.txt', 'r')
y_train_file = open('./UCI_HAR_Dataset/train/y_train.txt', 'r')
x_test_file = open('./UCI_HAR_Dataset/test/X_test.txt', 'r')
y_test_file = open('./UCI_HAR_Dataset/test/y_test.txt', 'r')
# Create empty lists
x_train = []
y_train = []
x_test = []
y_test = []
# Mapping table for classes
labels = {1:'WALKING', 2:'WALKING UPSTAIRS', 3:'WALKING DOWNSTAIRS',
4:'SITTING', 5:'STANDING', 6:'LAYING'}
# Loop through datasets
for x in x_train_file:
x_train.append([float(ts) for ts in x.split()])
for y in y_train_file:
y_train.append(int(y.rstrip('\n')))
for x in x_test_file:
x_test.append([float(ts) for ts in x.split()])
for y in y_test_file:
y_test.append(int(y.rstrip('\n')))
# Convert to numpy for efficiency
x_train = np.array(x_train)
y_train = np.array(y_train)
x_test = np.array(x_test)
y_test = np.array(y_test)
plt.figure(figsize=(11,7))
colors = ['#D62728','#2C9F2C','#FD7F23','#1F77B4','#9467BD',
'#8C564A','#7F7F7F','#1FBECF','#E377C2','#BCBD27']
for i, r in enumerate([0,27,65,100,145,172]):
plt.subplot(3,2,i+1)
plt.plot(x_train[r][:100], label=labels[y_train[r]], color=colors[y_train[r]-1], linewidth=2)
plt.xlabel('Samples @50Hz')
plt.legend(loc='upper left')
plt.tight_layout()
#have a look at the first series from test-set with label STANDING
idx = 0
plt.plot(x_test[idx][:100], label=labels[y_test[idx]], color=colors[y_train[idx]-1], linewidth=2)
plt.xlabel('Samples @50Hz')
plt.legend(loc='upper left')
plt.tight_layout()
#compare this test sample with the six previous and try to guess its label
for i, r in enumerate([0,27,65,100,145,172]):
distance = dtw.distance(x_test[idx], x_train[r])
print('test series-%d has DTW distance=%2.2f from train-%d with label %s'% (idx,distance,r,labels[y_train[r]]))
test series-0 has DTW distance=5.14 from train-0 with label STANDING test series-0 has DTW distance=4.58 from train-27 with label SITTING test series-0 has DTW distance=4.60 from train-65 with label LAYING test series-0 has DTW distance=6.89 from train-100 with label WALKING test series-0 has DTW distance=6.82 from train-145 with label WALKING DOWNSTAIRS test series-0 has DTW distance=5.78 from train-172 with label WALKING UPSTAIRS
#ooops didn't work
#note that smallest distance comes from a series labeled as SITTING
#lets find the k - nn
#using a larger sample
#we will return the label
#indicated by the majority of the k-nns
from random import sample
Sample=False #this will use all series in x_train
#if true use a smaller random sample instead
if Sample:
n=100
idxs = sample(range(0,x_train.shape[0]),n)
else:
idxs=range(0,x_train.shape[0])
n=x_train.shape[0]
import time
t_start=time.perf_counter()
distances=[]
#note: I am using window = 10 το expedite processing. See documentation of dtw.distance()
for r in range(n):
distances.append(dtw.distance(x_test[idx], x_train[idxs[r]],window=10,use_pruning=True))
t_end=time.perf_counter()
print(t_start,t_end,t_end-t_start)
3572.969679151 3693.087333251 120.11765409999998
#let's see the 20 closest matches
k=20
NN=sorted(range(len(distances)), key=lambda i: distances[i], reverse=False)[:k]
#print their indices
print(NN)
[848, 4535, 2704, 692, 849, 5272, 990, 6285, 4178, 522, 3605, 6243, 4534, 6244, 359, 6279, 875, 3237, 5273, 6078]
counters={}
for l in labels.values():
counters[l]=0
c=1;
for r in NN:
l=labels[y_train[r]]
counters[l]+=1
print('NN(%d) has label %s' % (c,l))
c+=1
print('Labels withing top-%2d NN:' % (k))
for l in labels.values():
if counters[l]>0:
print(l,':',counters[l])
#note that the majority of the NN-20 set have label=STANDING
NN(1) has label STANDING NN(2) has label STANDING NN(3) has label STANDING NN(4) has label STANDING NN(5) has label STANDING NN(6) has label STANDING NN(7) has label STANDING NN(8) has label STANDING NN(9) has label STANDING NN(10) has label STANDING NN(11) has label STANDING NN(12) has label STANDING NN(13) has label STANDING NN(14) has label STANDING NN(15) has label STANDING NN(16) has label STANDING NN(17) has label SITTING NN(18) has label STANDING NN(19) has label STANDING NN(20) has label STANDING Labels withing top-20 NN: SITTING : 1 STANDING : 19