一.目的
时间序列是数据的一种常见表示形式,对于处理时间序列来说,一个普遍的任务就是比较两个序列的相似性。但是在实际问题中,大部分时间序列都是不等长的,有的序列可能波形类似,但是时间错位。如下图所示,虚线与实线波形是类似的,但是波峰错位,这种情况下,如果直接使用欧式距离来表示时间序列的相似性显然不合理。DTW出现的目的很简单,就是为了合理的衡量两条时间序列的距离(或者说叫相似性),而且不要求两条序列长度相等。
二.简单理解DTW
简单来说,DTW就是捏住两条序列的头和尾,疯狂的蹂躏它们,拉伸短的,压缩长的,使得它们长度对齐。可是落实到实际序列点上怎么才能对齐呢?其实就是让序列点间发生一对多或者多对一的对应关系。如上图所示,每个红圈点都对应了很多个点,这样就可以达到序列对齐的目的。
现在,问题就转化成为了,如何寻找两条序列上点的对应关系,且满足所有对应点间的距离和最小。这是一个很明显的优化问题。
三.求解DTW
假设我们有两个时间序列A和B,他们的长度分别是n和m:
A= a1, a2,…, aj,…, am ;
B = b1, b2,…,bi,…, bn ;
为了对齐这两个序列,我们需要构造一个下图所示的n x m的矩阵网格,矩阵元素(i, j)表示ai和bj两个点的距离dij(也就是序列A的每一个点和B的每一个点之间的相似度,距离越小则相似度越高。),一般采用欧式距离,dij=(ai−bj)**2 ,也可以采用绝对值距离 Abs(ai-bj)。
现在问题转化成了,从左下角(1,1)位置开始,到(n,m)点,找到一条距离和最短的路径。
那么这条路径我们怎么找到呢?哪条路径才是最好的呢?
我们把这条路径定义为规整路径,并用W来表示,这条路径不是随意选择的,需要满足以下几个约束:
1. 边界条件:
因为我们需要将两条序列对齐,所以路径的起点必须是(1,1),终点必须是(n,m)。
2. 连续性:
如果路径W上的节点wk-1= (a’, b’),那么对于路径的下一个点wk=(a, b)需要满足 (a-a’) <=1和 (b-b’) <=1。也就是不可能跨过某个点去匹配,只能和自己相邻的点对齐。这样可以保证A和B中的每个坐标都在W中出现。
3. 单调性:
如果wk-1= (a’, b’),那么对于路径的下一个点wk=(a, b)需要满足0<=(a-a’)和0<= (b-b’)。这限制W上面的点必须是随着时间单调进行的。
结合连续性和单调性约束,每一个格点的路径就只有三个方向了。例如如果路径已经通过了格点(i, j),那么下一个通过的格点只可能是下列三种情况之一:(i+1, j),(i, j+1)或者(i+1, j+1)。
这是一个显然的动态规划策略,假设我们要求到位置(i,j)的最小累计距离D(i,j),那么它只能由: D(i−1,j) ; D(i,j−1); D(i,j) 这三个位置的最小累计距离中寻找,也就是:
其中边界,D(0,0)=0; D(-1,0)=∞,D(0,-1)=∞
四. 举个例子
比如说,给定一个样本序列X和比对序列Y:
X:3,5,6,7,7,1
Y:3,6,6,7,8,1,1
DTW首先会根据序列点之间的距离(这里使用绝对距离),获得一个序列距离矩阵 M(序列中任意两点间的距离都要计算)。
然后根据距离矩阵,使用DP算法的递推关系式,得到累计距离矩阵(或称代价矩阵)D:
最后,两个序列的距离,由代价矩阵最后一行最后一列给出,在这里也就是2。
五. 代码
import numpy as np
def DTW_distance(series_a,series_b):
series_a=np.array(series_a)
series_b=np.array(series_b)
len_a=np.size(series_a)
len_b=np.size(series_b)
#计算序列间的绝对距离,距离矩阵M
ABS_distance={}
for i in range(len_a):
for j in range(len_b):
ABS_distance[(i,j)]=np.abs(series_a[i]-series_b[j])
#利用DP算法求解最优路径
#由于数组从0开始,元素必定从(0,0)位置出发,初始位置的代价就是其本身的距离
D={(0,0):ABS_distance[(0,0)]}
#首先求解第一行与第一列的累计距离(代价),因为他们的上一步唯一确定
for i in range(1,len_a):
D[(i,0)]=ABS_distance[(i,0)]+D[(i-1,0)]
for i in range(1,len_b):
D[(0,i)]=ABS_distance[(0,i)]+D[(0,i-1)]
#求解其他位置
for i in range(1,len_a):
for j in range(1,len_b):
D[(i,j)]=ABS_distance[(i,j)]+min(D[(i-1,j),D[(i,j-1),D[(i-1,j-1)])
return D
参考:
https://blog.csdn.net/qq_39516859/article/details/81705010
https://blog.csdn.net/raym0ndkwan/article/details/45614813