0%

K-nearest neighbor(K-NN)

基本的分类和回归方法

给定一个训练数据集, 对新的输入实例, 在训练数据集中找到与该实例最邻近的k个实例, 这k个实例的多数属于某个类, 就把该输入实例分为这个类。

1. k 近邻模型

三个基本要素:距离度量,k值选择、分类决策规则

1.1 距离度量

$x_{i}=\left(x_{i}^{(1)}, x_{i}^{(2)}, \cdots, x_{i}^{(n)}\right)^{\mathrm{T}}$ $L_{p}$ 距离:
$$
L_{p}\left(x_{i}, x_{j}\right)=\left(\sum_{l=1}^{n}\left|x_{i}^{(l)}-x_{j}^{(l)}\right|^{p}\right)^{\frac{1}{p}}
$$
当 $p=2$ 为欧式距离(常用)

当 $p=1 $ 为曼哈顿距离

当 $p=\infty$ 时,是各个坐标距离的最大值
$$
L_{\infty}(x_{i}, x_{j})=\max_{l}\left|x_{i}^{(l)}-x_{j}^{(l)}\right|
$$

1.2 k值选择

k值选择较小时,近似误差小,估计误差大,过拟合

k值选择较大时,近似误差大,可以减少估计误差

Trick: k值一般取一个比较小的数值。 通常采用交叉验证法来选取最优的k值

1.3 分类决策规则

往往是多数表决:由输入实例的k个邻近的训练实例中的多数类决定输入实例的类

误分类率:
$$
\frac{1}{k} \sum_{x_{i} \in N_{k}(x)} I\left(y_{i} \neq c_{j}\right)=1-\frac{1}{k} \sum_{x_{i} \in N_{k}(x)} I\left(y_{i}=c_{j}\right)
$$

2. 模型实现:kd树

实现k近邻法时, 主要考虑的问题是如何对训练数据进行快速k近邻搜索。 这点在特征空间的维数大及训练数据容量大时尤其必要

线性扫描不可取,因为遍历所有的数据计算量太大

解决方法:kd树(kd tree)

kd树是二叉树, 平衡的kd树搜索时的效率未必是最优的

算法 3.2 (构造平衡 $k d$ 树)

输入: $k$ 维空间数据集 $T= \lbrace x_{1}, x_{2}, \cdots, x_{N} \rbrace$,其中 $x_{i}=\left(x_{i}^{(1)}, x_{i}^{(2)}, \cdots, x_{i}^{(k)}\right)^{\mathrm{T}}$,$i=1,2, \cdots, N$
输出: $k d$ 树。
(1) 开始: 构造根结点,根结点对应于包含 $T$ 的 $k$ 维空间的超矩形区域。

​ $x^{(1)}$ 为坐标轴,以 $T$ 中所有实例的 $x^{(1)}$ 坐标的中位数为切分点,将根结点选择对应的超矩形区域切分为两个子区域。切分由通过切分点并与坐标轴 $x^{(1)}$ 垂直的超平面实现。

​ 由根结点生成深度为 1 的左、右子结点: 左子结点对应坐标 $x^{(1)}$ 小于切分点的子$x^{(1)}$ 大于切分点的子区域。 区域,右子结点对应于坐标将落在切分超平面上的实例点保存在根结点。

​ (2)重复: 对深度为 $j$ 的结点,选择 $x^{(l)}$ 为切分的坐标轴,$l=j(\bmod k)+1,$ 以该结点的区域中所有实例的 $x^{(l)}$ 坐标的中位数为切分点,将该结点对应的超矩形区域切分为两个子区域。切分由通过切分点并与坐标轴 $x^{(l)}$ 垂直的超平面实现。
​ 由该结点生成深度为 $j+1$ 的左、右子结点:左子结点对应坐标 $x^{(l)}$ 小于切分点 的子区域,右子结点对应坐标 $x^{(l)}$ 大于切分点的子区域。

​ 将落在切分超平面上的实例点保存在该结点。

​ (3)直到两个子区域没有实例存在时停止。从而形成 $k d$ 树的区域划分.

搜索kd树

给定一个目标点, 搜索其最近邻。 首先找到包含目标点的叶结点; 然后从该叶结点出发, 依次回退到父结点; 不断查找与目标点最邻近的结点, 当确定不可能存在更近的结点时终止

算法 $3.3$(用 $k d$ 树的最近邻搜索)

输入: 已构造的 $k d$ 树,目标点 $x$

输出: $x$ 的最近邻。

(1) 在 $k d$ 树中找出包含目标点 $x$ 的叶结点: 从根结点出发,递归地向下访问 $k d$ 树。若目标点 $x$ 当前维的坐标小于切分点的坐标,则移动到左子结点,否则移动到右子结点。直到子结点为叶结点为止

(2) 以此叶结点为“当前最近点”。

(3) 递归地向上回退,在每个结点进行以下操作:

​ (a)如果该结点保存的实例点比当前最近点距离目标点更近,则以该实例点为“当前最近点”。

​ (b)当前最近点一定存在于该结点一个子结点对应的区域。检查该子结点的父结点的另一子结点对应的区域是否有更近的点。具体地,检查另一子结点对应的 区域是否与以目标点为球心、以目标点与“当前最近点”间的距离为半径的超球体相交。

​ 如果相交,可能在另一个子结点对应的区域内存在距目标点更近的点,移动 到另一个子结点。接着,递归地进行最近邻搜索;

​ 如果不相交,向上回退。

(4) 当回退到根结点时,搜索结束。最后的“当前最近点”即为 $x$ 的最近邻点。

3. 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
## max()函数技巧

## 初级技巧
tmp = max(1,2,4)
print(tmp)
#可迭代对象
a = [1, 2, 3, 4, 5, 6]
tmp = max(a)
print(tmp)

## 中级技巧:key属性的使用
# key参数不为空时,就以key的函数对象为判断的标准。
# 如果我们想找出一组数中绝对值最大的数,就可以配合lamda先进行处理,再找出最大值
a = [-9, -8, 1, 3, -4, 6]
tmp = max(a, key=lambda x: abs(x))
print(tmp)

## 高级技巧:找出字典中值最大的那组数据
#在对字典进行数据操作的时候,默认只会处理key,而不是value
#先使用zip把字典的keys和values翻转过来,再用max取出值最大的那组数据
#这个时候key是值,value是之前的key
#如果有一组商品,其名称和价格都存在一个字典中,可以用下面的方法快速找到价格最贵的那组商品:
prices = {
'A':123,
'B':450.1,
'C':12,
'E':444,
}
# 在对字典进行数据操作的时候,默认只会处理key,而不是value
# 先使用zip把字典的keys和values翻转过来,再用max取出值最大的那组数据
max_prices = max(zip(prices.values(), prices.keys()))
print(max_prices)
#这个时候key是值,value是之前的key
# (450.1, 'B')

#当字典中的value相同的时候,才会比较key
prices = {
'A': 123,
'B': 123,
}
max_prices = max(zip(prices.values(), prices.keys()))print(max_prices) # (123, 'B')
min_prices = min(zip(prices.values(), prices.keys()))print(min_prices) # (123, 'A')

K-NN

1
2
3
4
5
6
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.datasets import load_iris
1
2
3
4
5
## load data
iris = load_iris() # np.array 格式
df = pd.DataFrame(iris.data, columns = iris.feature_names)
df['label'] = iris.target # 添加label列
df[:20] # 查看一下实际数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
data = np.array(df.iloc[:,:]) # 数据由Datafrom转化为array
X, Y = data[:, :-1], data[:, -1]

def _shuffle(X, Y):
randomize = np.arange(len(X))
np.random.shuffle(randomize)
return X[randomize], Y[randomize]
X, Y = _shuffle(X,Y)

def _train_test_split(X, Y, split_ratio = 0.1):
train_size = int(len(X) * (1 - split_ratio))
return X[:train_size], Y[:train_size], X[train_size:], Y[train_size:]

X_train, Y_train, X_test, Y_test = _train_test_split(X, Y, split_ratio = 0.1)
1
2
3
4
5
6
7
8
9
10
11
12
## 数据标准化
def _normalize(X, train_set = True, specified_columns = None, X_mean = None, X_std = None):
if specified_columns == None:
specified_columns = np.arange(len(X[0]))
if train_set:
X_mean = np.mean(X[:, specified_columns], axis = 0)
X_std = np.std(X[:, specified_columns], axis = 0)
X[:, specified_columns] = (X[:,specified_columns] - X_mean) / (X_std + 1e-8)
return X, X_mean, X_std

X_train, X_mean, X_std = _normalize(X_train, train_set=True)
X_test, _, _ = _normalize(X_test, train_set = None, X_mean = X_mean, X_std = X_std)
  • 线性遍历最近的K个点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class KNN_LinearSearch:
def __init__(self, X_train, Y_train, k = 3, p=2):
"""
k: 临近点的个数
p: 距离度量
"""
self.k = k
self.p = p
self.X_train = X_train
self.Y_train = Y_train

def predict(self, X_test):
"""
X_test,该函数找出最近的k个点,并按照多数表决规则进行预测
"""
result = []
for i in range(len(X_test)):
X = X_test[i]
knn_list = []
for j in range(self.k):
dist = np.linalg.norm(X - self.X_train[j], ord=self.p) #np中求线性代数范数的公式
knn_list.append((dist, self.Y_train[j]))
for l in range(self.k, len(self.X_train)):
max_index = knn_list.index(max(knn_list, key=lambda x: x[0]))
dist_new = np.linalg.norm(X-self.X_train[l], ord=self.p)
if knn_list[max_index][0] > dist_new:
knn_list[max_index] = (dist, self.Y_train[l])

# 分类决策规则:多数表决
knn_label = [k[-1] for k in knn_list]
max_count_label = max(knn_label, key=knn_label.count)
result.append(max_count_label)
return result

def Correct_rate(self, X_test, Y_test):
right_count = 0
predictions = self.predict(X_test)
for i in range(len(Y_test)):
if predictions[i] == Y_test[i]:
right_count += 1
return right_count / len(X_test)
1
2
3
4
KNN = KNN_LinearSearch(X_train, Y_train)
KNN.predict(X_test)
# [1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0]
KNN.Correct_rate(X_test, Y_test)
  • Kd树-最近邻搜索
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class KdNode:
def __init__(self, dom_elt, split, left, right):
self.dom_elt = dom_elt # k维向量节点(k维空间中的一个样本点), 分割的那个样本点 dom_elt = [1,2,3,5,6...]
self.split = split # 整数(进行分割纬度的序号)
self.left = left # 该结点分割超平面左子空间构成的kd-tree
self.right = right # 该结点分割超平面右子空间构成的kd-tree

class KdTree:
def __init__(self, data):
dim = len(data[0]) # 数据维度

def CreateNode(split, data_set):
if not data_set:
return None
# 按要进行分割的那一切数据排序
data_set.sort(key=lambda x: x[split]) # 这里sort,key的用法和max类似
split_pos = len(data_set) // 2 # 整数除法
median = data_set[split_pos] # 中位数分割点
split_next = (split + 1) % dim

# 递归的创建kd树
return KdNode(median, split,
CreateNode(split_next, data_set[:split_pos]),
CreateNode(split_next, data_set[split_pos + 1:]))
self.root = CreateNode(0, data) # 从第0维开始创建kd树

# kd树的前序遍历
def preorder(root):
print(root.dom_elt)
if root.left: # 节点不为空
preorder(root.left)
if root.right:
preorder(root.right)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
## 寻找最近的k个点
from math import sqrt
from collections import namedtuple

# 定义一个namedtuple, 分别存放最近坐标点、最近距离和访问过的节点数
result = namedtuple("Result_tuple", "nearest_point nearest_dist nodes_visited")

def find_nearest(tree, point):
dim = len(point)
def travel(kd_node, target, max_dist):
# 迭代终止条件
if kd_node is None:
return result([0] * dim, float("inf"), 0) # float("inf")正无穷,float("-inf")

nodes_visited = 1
s = kd_node.split # 分割的纬度
pivot = kd_node.dom_elt # 进行分割的轴,实例点

if target[s] <= pivot[s]: # 如果目标点第s维小于分割点的对应值
nearer_node = kd_node.left #下一访问节点为左子树根节点
further_node = kd_node.right # 同时记录右子树以便后期与目标点进行比较
else:
nearer_node = kd_node.right
further_node = kd_node.left

temp1 = travel(nearer_node, target, max_dist) # 递归遍历

nearest = temp1.nearest_point # 以此叶节点作为当前最近点
dist = temp1.nearest_dist # 更新最近距离
nodes_visited += temp1.nodes_visited

if dist < max_dist:
max_dist = dist

temp_dist = abs(pivot[s] - target[s]) # 第s维上目标点与分割超平面的距离

if max_dist < temp_dist: # 判断超球体是否与超平面相交
return result(nearest, dist, nodes_visited) # 不相交则可以直接返回,不用继续判断,也就意味这父节点(分割点)的另一子节点不会比现在的子节点与目标点更近

# 计算目标点和分割点的欧氏距离
temp_dist = sqrt(sum((p1 - p2) ** 2 for p1, p2 in zip(pivot, target)))
if temp_dist < dist: # 意味着分割点此时为最近点
nearest = pivot # 更新最近点
dist = temp_dist # 更新最近距离
max_dist = dist # 更新超超球体半径

# 检查另一个子节点对应的区域是否有更近的点
temp2 = travel(further_node, target, max_dist)

nodes_visited += temp2.nodes_visited

if temp2.nearest_dist < dist: # 如果另一个子节点内存在更近距离
nearest = temp2.nearest_point
dist = temp2.nearest_dist

return result(nearest, dist, nodes_visited)
return travel(tree.root, point, float("inf")) # 从根节点递归
1
2
3
4
5
6
7
8
9
data = [[2,3],[5,4],[9,6],[4,7],[8,1],[7,2]]
kd = KdTree(data)
preorder(kd.root)
[7, 2]
[5, 4]
[2, 3]
[4, 7]
[9, 6]
[8, 1]
1
2
3
4
5
6
7
8
9
10
from time import clock
from random import random

# 产生一个k维随机向量,每维分量值在0~1之间
def random_point(k):
return [random() for _ in range(k)]

# 产生n个k维随机向量
def random_points(k, n):
return [random_point(k) for _ in range(n)]
1
2
3
ret = find_nearest(kd, [3, 4.5])
print(ret)
# Result_tuple(nearest_point=[2, 3], nearest_dist=1.8027756377319946, nodes_visited=4)
1
2
3
4
5
6
7
8
9
N = 400000
t0 = clock()
kd2 = KdTree(random_points(3,N)) # 构建包含四十万个3维空间样本点的kd树
ret2 = find_nearest(kd2, [0.1, 0.5, 0.8]) # 四十万个样本点中寻找离目标最近的点
t1 = clock()
print("time: ", t1-t0, "s")
print(ret2)
# time: 5.106322 s
# Result_tuple(nearest_point=[0.10488218676493222, 0.49793482149466295, 0.7951887681764834], nearest_dist=0.007158817047962914, nodes_visited=62)

Reference:

[1] https://github.com/fengdu78/lihang-code/blob/master/%E7%AC%AC03%E7%AB%A0%20k%E8%BF%91%E9%82%BB%E6%B3%95/3.KNearestNeighbors.ipynb

[2] 《统计学习方法》 —李航