基本的分类和回归方法
给定一个训练数据集, 对新的输入实例, 在训练数据集中找到与该实例最邻近的k个实例, 这k个实例的多数属于某个类, 就把该输入实例分为这个类。
1. k 近邻模型
三个基本要素:距离度量,k值选择、分类决策规则
1.1 距离度量
$x_{i}=\left(x_{i}^{(1)}, x_{i}^{(2)}, \cdots, x_{i}^{(n)}\right)^{\mathrm{T}}$ $L_{p}$ 距离:
$$
L_{p}\left(x_{i}, x_{j}\right)=\left(\sum_{l=1}^{n}\left|x_{i}^{(l)}-x_{j}^{(l)}\right|^{p}\right)^{\frac{1}{p}}
$$
当 $p=2$ 为欧式距离(常用)
当 $p=1 $ 为曼哈顿距离
当 $p=\infty$ 时,是各个坐标距离的最大值
$$
L_{\infty}(x_{i}, x_{j})=\max_{l}\left|x_{i}^{(l)}-x_{j}^{(l)}\right|
$$
1.2 k值选择
k值选择较小时,近似误差小,估计误差大,过拟合
k值选择较大时,近似误差大,可以减少估计误差
Trick: k值一般取一个比较小的数值。 通常采用交叉验证法来选取最优的k值
1.3 分类决策规则
往往是多数表决:由输入实例的k个邻近的训练实例中的多数类决定输入实例的类
误分类率:
$$
\frac{1}{k} \sum_{x_{i} \in N_{k}(x)} I\left(y_{i} \neq c_{j}\right)=1-\frac{1}{k} \sum_{x_{i} \in N_{k}(x)} I\left(y_{i}=c_{j}\right)
$$
2. 模型实现:kd树
实现k近邻法时, 主要考虑的问题是如何对训练数据进行快速k近邻搜索。 这点在特征空间的维数大及训练数据容量大时尤其必要
线性扫描不可取,因为遍历所有的数据计算量太大
解决方法:kd树(kd tree)
kd树是二叉树, 平衡的kd树搜索时的效率未必是最优的
算法 3.2 (构造平衡 $k d$ 树)
输入: $k$ 维空间数据集 $T= \lbrace x_{1}, x_{2}, \cdots, x_{N} \rbrace$,其中 $x_{i}=\left(x_{i}^{(1)}, x_{i}^{(2)}, \cdots, x_{i}^{(k)}\right)^{\mathrm{T}}$,$i=1,2, \cdots, N$
输出: $k d$ 树。
(1) 开始: 构造根结点,根结点对应于包含 $T$ 的 $k$ 维空间的超矩形区域。
$x^{(1)}$ 为坐标轴,以 $T$ 中所有实例的 $x^{(1)}$ 坐标的中位数为切分点,将根结点选择对应的超矩形区域切分为两个子区域。切分由通过切分点并与坐标轴 $x^{(1)}$ 垂直的超平面实现。
由根结点生成深度为 1 的左、右子结点: 左子结点对应坐标 $x^{(1)}$ 小于切分点的子$x^{(1)}$ 大于切分点的子区域。 区域,右子结点对应于坐标将落在切分超平面上的实例点保存在根结点。
(2)重复: 对深度为 $j$ 的结点,选择 $x^{(l)}$ 为切分的坐标轴,$l=j(\bmod k)+1,$ 以该结点的区域中所有实例的 $x^{(l)}$ 坐标的中位数为切分点,将该结点对应的超矩形区域切分为两个子区域。切分由通过切分点并与坐标轴 $x^{(l)}$ 垂直的超平面实现。
由该结点生成深度为 $j+1$ 的左、右子结点:左子结点对应坐标 $x^{(l)}$ 小于切分点 的子区域,右子结点对应坐标 $x^{(l)}$ 大于切分点的子区域。
将落在切分超平面上的实例点保存在该结点。
(3)直到两个子区域没有实例存在时停止。从而形成 $k d$ 树的区域划分.
搜索kd树
给定一个目标点, 搜索其最近邻。 首先找到包含目标点的叶结点; 然后从该叶结点出发, 依次回退到父结点; 不断查找与目标点最邻近的结点, 当确定不可能存在更近的结点时终止
算法 $3.3$(用 $k d$ 树的最近邻搜索)
输入: 已构造的 $k d$ 树,目标点 $x$
输出: $x$ 的最近邻。
(1) 在 $k d$ 树中找出包含目标点 $x$ 的叶结点: 从根结点出发,递归地向下访问 $k d$ 树。若目标点 $x$ 当前维的坐标小于切分点的坐标,则移动到左子结点,否则移动到右子结点。直到子结点为叶结点为止。
(2) 以此叶结点为“当前最近点”。
(3) 递归地向上回退,在每个结点进行以下操作:
(a)如果该结点保存的实例点比当前最近点距离目标点更近,则以该实例点为“当前最近点”。
(b)当前最近点一定存在于该结点一个子结点对应的区域。检查该子结点的父结点的另一子结点对应的区域是否有更近的点。具体地,检查另一子结点对应的 区域是否与以目标点为球心、以目标点与“当前最近点”间的距离为半径的超球体相交。
如果相交,可能在另一个子结点对应的区域内存在距目标点更近的点,移动 到另一个子结点。接着,递归地进行最近邻搜索;
如果不相交,向上回退。
(4) 当回退到根结点时,搜索结束。最后的“当前最近点”即为 $x$ 的最近邻点。
3. 代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
tmp = max(1,2,4) print(tmp)
a = [1, 2, 3, 4, 5, 6] tmp = max(a) print(tmp)
a = [-9, -8, 1, 3, -4, 6] tmp = max(a, key=lambda x: abs(x)) print(tmp)
prices = { 'A':123, 'B':450.1, 'C':12, 'E':444, }
max_prices = max(zip(prices.values(), prices.keys())) print(max_prices)
prices = { 'A': 123, 'B': 123, } max_prices = max(zip(prices.values(), prices.keys()))print(max_prices) min_prices = min(zip(prices.values(), prices.keys()))print(min_prices)
|
K-NN
1 2 3 4 5 6
| import numpy as np import pandas as pd import matplotlib.pyplot as plt %matplotlib inline
from sklearn.datasets import load_iris
|
1 2 3 4 5
| iris = load_iris() df = pd.DataFrame(iris.data, columns = iris.feature_names) df['label'] = iris.target df[:20]
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| data = np.array(df.iloc[:,:]) X, Y = data[:, :-1], data[:, -1]
def _shuffle(X, Y): randomize = np.arange(len(X)) np.random.shuffle(randomize) return X[randomize], Y[randomize] X, Y = _shuffle(X,Y)
def _train_test_split(X, Y, split_ratio = 0.1): train_size = int(len(X) * (1 - split_ratio)) return X[:train_size], Y[:train_size], X[train_size:], Y[train_size:]
X_train, Y_train, X_test, Y_test = _train_test_split(X, Y, split_ratio = 0.1)
|
1 2 3 4 5 6 7 8 9 10 11 12
| def _normalize(X, train_set = True, specified_columns = None, X_mean = None, X_std = None): if specified_columns == None: specified_columns = np.arange(len(X[0])) if train_set: X_mean = np.mean(X[:, specified_columns], axis = 0) X_std = np.std(X[:, specified_columns], axis = 0) X[:, specified_columns] = (X[:,specified_columns] - X_mean) / (X_std + 1e-8) return X, X_mean, X_std
X_train, X_mean, X_std = _normalize(X_train, train_set=True) X_test, _, _ = _normalize(X_test, train_set = None, X_mean = X_mean, X_std = X_std)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| class KNN_LinearSearch: def __init__(self, X_train, Y_train, k = 3, p=2): """ k: 临近点的个数 p: 距离度量 """ self.k = k self.p = p self.X_train = X_train self.Y_train = Y_train def predict(self, X_test): """ X_test,该函数找出最近的k个点,并按照多数表决规则进行预测 """ result = [] for i in range(len(X_test)): X = X_test[i] knn_list = [] for j in range(self.k): dist = np.linalg.norm(X - self.X_train[j], ord=self.p) knn_list.append((dist, self.Y_train[j])) for l in range(self.k, len(self.X_train)): max_index = knn_list.index(max(knn_list, key=lambda x: x[0])) dist_new = np.linalg.norm(X-self.X_train[l], ord=self.p) if knn_list[max_index][0] > dist_new: knn_list[max_index] = (dist, self.Y_train[l]) knn_label = [k[-1] for k in knn_list] max_count_label = max(knn_label, key=knn_label.count) result.append(max_count_label) return result def Correct_rate(self, X_test, Y_test): right_count = 0 predictions = self.predict(X_test) for i in range(len(Y_test)): if predictions[i] == Y_test[i]: right_count += 1 return right_count / len(X_test)
|
1 2 3 4
| KNN = KNN_LinearSearch(X_train, Y_train) KNN.predict(X_test)
KNN.Correct_rate(X_test, Y_test)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class KdNode: def __init__(self, dom_elt, split, left, right): self.dom_elt = dom_elt self.split = split self.left = left self.right = right
class KdTree: def __init__(self, data): dim = len(data[0]) def CreateNode(split, data_set): if not data_set: return None data_set.sort(key=lambda x: x[split]) split_pos = len(data_set) // 2 median = data_set[split_pos] split_next = (split + 1) % dim return KdNode(median, split, CreateNode(split_next, data_set[:split_pos]), CreateNode(split_next, data_set[split_pos + 1:])) self.root = CreateNode(0, data)
def preorder(root): print(root.dom_elt) if root.left: preorder(root.left) if root.right: preorder(root.right)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| from math import sqrt from collections import namedtuple
result = namedtuple("Result_tuple", "nearest_point nearest_dist nodes_visited")
def find_nearest(tree, point): dim = len(point) def travel(kd_node, target, max_dist): if kd_node is None: return result([0] * dim, float("inf"), 0) nodes_visited = 1 s = kd_node.split pivot = kd_node.dom_elt if target[s] <= pivot[s]: nearer_node = kd_node.left further_node = kd_node.right else: nearer_node = kd_node.right further_node = kd_node.left temp1 = travel(nearer_node, target, max_dist) nearest = temp1.nearest_point dist = temp1.nearest_dist nodes_visited += temp1.nodes_visited if dist < max_dist: max_dist = dist temp_dist = abs(pivot[s] - target[s]) if max_dist < temp_dist: return result(nearest, dist, nodes_visited) temp_dist = sqrt(sum((p1 - p2) ** 2 for p1, p2 in zip(pivot, target))) if temp_dist < dist: nearest = pivot dist = temp_dist max_dist = dist temp2 = travel(further_node, target, max_dist) nodes_visited += temp2.nodes_visited if temp2.nearest_dist < dist: nearest = temp2.nearest_point dist = temp2.nearest_dist return result(nearest, dist, nodes_visited) return travel(tree.root, point, float("inf"))
|
1 2 3 4 5 6 7 8 9
| data = [[2,3],[5,4],[9,6],[4,7],[8,1],[7,2]] kd = KdTree(data) preorder(kd.root) [7, 2] [5, 4] [2, 3] [4, 7] [9, 6] [8, 1]
|
1 2 3 4 5 6 7 8 9 10
| from time import clock from random import random
def random_point(k): return [random() for _ in range(k)]
def random_points(k, n): return [random_point(k) for _ in range(n)]
|
1 2 3
| ret = find_nearest(kd, [3, 4.5]) print(ret)
|
1 2 3 4 5 6 7 8 9
| N = 400000 t0 = clock() kd2 = KdTree(random_points(3,N)) ret2 = find_nearest(kd2, [0.1, 0.5, 0.8]) t1 = clock() print("time: ", t1-t0, "s") print(ret2)
|
Reference:
[1] https://github.com/fengdu78/lihang-code/blob/master/%E7%AC%AC03%E7%AB%A0%20k%E8%BF%91%E9%82%BB%E6%B3%95/3.KNearestNeighbors.ipynb
[2] 《统计学习方法》 —李航