🧠 使用 Python 实现高效凝聚层次聚类(Agglomerative Clustering)

在本篇博客中,我们将一起探索如何使用 Python 实现一个高效的 凝聚层次聚类(Agglomerative Hierarchical Clustering)算法,并支持三种不同的簇间距离度量方式:single linkage, complete linkageaverage linkage。我们还将通过可视化展示不同方法的效果,并分析其优劣。

📚 一、什么是凝聚层次聚类?

凝聚层次聚类是一种 自底向上的无监督聚类算法,它的核心思想是:

每个数据点初始时都是一个独立的簇,然后逐步合并最相似的两个簇,直到达到预设的簇数。

与 K-Means 不同,它不需要提前指定簇的数量即可构建整个聚类树(dendrogram),因此非常适合探索性数据分析。

⚙️ 二、代码概述

🔍 功能亮点:

  • 支持三种 linkage 方法
    • 'single': 最短距离法(最近邻)
    • 'complete': 最长距离法(最远邻)
    • 'average': 平均距离法
  • 使用 最小堆(heapq) 加速查找最近簇对
  • 对 Iris 数据集进行特征选择后聚类
  • 绘制原始标签与不同 linkage 聚类结果对比图

📦 三、依赖库介绍

1
2
3
4
5
6
7
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import heapq
from scipy.spatial.distance import squareform, pdist
from collections import defaultdict

用途
numpy 高效数组操作
pandas 数据读取与处理
matplotlib 可视化绘图
heapq 最小堆实现优先队列
scipy.spatial.distance 计算成对距离

📐 四、核心函数详解

1. 簇间距离计算函数

1
2
3
4
5
6
7
8
9
10
def cluster_distance(c1, c2, dist_matrix, method='single'):
if method == 'single':
return np.min(dist_matrix[np.ix_(c1, c2)])
elif method == 'complete':
return np.max(dist_matrix[np.ix_(c1, c2)])
elif method == 'average':
return np.mean(dist_matrix[np.ix_(c1, c2)])
else:
raise ValueError("Unknown method")

  • 功能:根据指定的 linkage 方法计算两个簇之间的距离。
  • 使用场景
    • 'single': 用于识别链状分布的数据。
    • 'complete': 更关注簇内紧密性。
    • 'average': 两者折中,适合大多数情况 。

2. 初始化距离矩阵

1
2
dist_matrix = squareform(pdist(features[:, feature_indices]))

  • pdist:计算所有样本两两之间的欧氏距离。
  • squareform:将压缩的距离向量转换为 N×N 的距离矩阵 。

3. 初始化每个样本为独立簇

1
2
init_clusters = [[i] for i in range(numberOfsamples)]

  • 初始时每个样本是一个簇,用索引表示。

4. 构建最小堆维护簇对距离

1
2
3
4
5
for i in range(len(clusters)):
for j in range(i+1, len(clusters)):
dist = cluster_distance(clusters[i], clusters[j], dist_matrix, method)
heapq.heappush(heap, (dist, i, j))

  • 使用 heapq 构建最小堆,每次取出当前最近的簇对进行合并。
  • 时间复杂度从 O(N³) 降低至 O(N² log N) 。

5. 合并簇并更新堆

1
2
3
4
5
6
7
8
9
10
11
12
while sum(valid) > numberOfcluster:
while True:
dist, i, j = heapq.heappop(heap)
if valid[i] and valid[j]:
break
clusters[i].extend(clusters[j])
valid[j] = False
for k in range(len(clusters)):
if k != i and valid[k]:
dist = cluster_distance(clusters[i], clusters[k], dist_matrix, method)
heapq.heappush(heap, (dist, i, k))

  • 有效簇标记:使用 valid 数组记录哪些簇是有效的。
  • 动态更新距离:合并后重新计算新簇与其他簇的距离并插入堆中。

📊 六、可视化展示

我们绘制了四种图像:

  1. 原始数据按真实标签显示
  2. 'single' linkage 聚类结果
  3. 'complete' linkage 聚类结果
  4. 'average' linkage 聚类结果

每种方法都使用不同颜色和形状的标记来区分簇。

示例代码片段:

1
2
3
4
5
6
7
8
plt.scatter(
features[clu, feature_indices[0]],
features[clu, feature_indices[1]],
color=[color],
marker=marker,
label=f'Cluster {idx+1}'
)

Agglomerative Clustering


🧩 七、三种 Linkage 方法对比

Linkage 方法 特点 适用场景
'single' 容易形成“链式”簇 适用于流形型数据
'complete' 强调簇内紧凑性 适用于球形分布数据
'average' 折中方案,平衡表现 通用性强

📚 参考资料

  • [1] Scipy pdist & squareform 文档
  • [2] NumPy 与 Pandas 简介
  • [3] 密度峰值聚类算法综述
  • [4] 自适应密度峰值聚类算法
  • [5] 层次聚类与 DPC 算法对比

🧠 使用 Python 实现向量化 K-Means 聚类算法

K-Means 是一种经典的无监督聚类算法,广泛应用于数据挖掘、图像压缩、客户分群等领域。本文将带你一步步理解并实现一个高效的向量化 K-Means 算法,并通过 sklearn 加载 Iris 数据集进行实战演示。

📦 一、环境依赖与数据准备

我们使用如下库:

1
2
3
4
5
import matplotlib.pyplot as plt
import numpy as np
from sklearn.cluster import KMeans
from sklearn import datasets

✅ 数据加载

1
2
3
4
iris = datasets.load_iris()
X = iris.data[:, :4]
numberOfcluster = len(iris.target_names)

  • X: 特征矩阵(150 × 4)
  • numberOfcluster: 类别数为 3

⚙️ 二、使用 sklearn 的 KMeans 进行聚类

我们先调用 sklearn.cluster.KMeans 快速完成聚类任务:

1
2
3
4
estimator = KMeans(n_clusters=numberOfcluster)
estimator.fit(X)
label_pred = estimator.labels_

然后绘制原始数据和聚类后的结果图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
plt.figure(figsize=(12, 8))
plt.subplot(121)
plt.scatter(X[:, 1], X[:, 2], c="red", marker='o', label='data')
plt.xlabel('sepal length')
plt.ylabel('sepal width')
plt.legend(loc=2)

plt.subplot(122)
x0 = X[label_pred == 0]
x1 = X[label_pred == 1]
x2 = X[label_pred == 2]
plt.scatter(x0[:, 1], x0[:, 2], c="red", marker='o', label='label0')
plt.scatter(x1[:, 1], x1[:, 2], c="green", marker='*', label='label1')
plt.scatter(x2[:, 1], x2[:, 2], c="blue", marker='+', label='label2')
plt.xlabel('sepal length')
plt.ylabel('sepal width')
plt.legend(loc=2)
plt.show()

✅ 使用 sklearn 可以快速完成聚类,但了解其内部实现更有助于深入理解算法原理 。

K-Means


🧩 三、手动实现向量化 K-Means

为了更深入理解 K-Means 的工作原理,我们实现了向量化版本的 K-Means 算法,避免了传统的嵌套循环,提高了计算效率。

1. 向量化计算距离矩阵

1
2
3
4
5
6
7
def compute_distances(dataSet, centroids):
m = dataSet.shape[0]
k = centroids.shape[0]
diff = dataSet[:, np.newaxis, :] - centroids[np.newaxis, :, :]
distances = np.sum(diff**2, axis=2)
return distances

  • 利用 NumPy 广播机制一次性计算所有点到质心的距离。
  • 避免双重 for 循环,提升性能 。

2. 分配样本到最近质心

1
2
3
def assign_clusters(distances):
return np.argmin(distances, axis=1)

  • 沿列方向取最小值索引,确定每个样本所属簇。

3. 随机初始化质心

1
2
3
4
5
6
7
8
def randomCenter(dataSet, k): 
m, n = dataSet.shape
centroids = np.zeros((k, n))
for i in range(k):
index = int(np.random.uniform(0, m))
centroids[i, :] = dataSet[index, :]
return centroids

  • 从数据集中随机选择 k 个样本作为初始质心 。

4. 更新质心

1
2
3
4
5
6
7
8
def update_centroids(dataSet, cluster_indices, k):
centroids = np.zeros((k, dataSet.shape[1]))
for j in range(k):
mask = (cluster_indices == j)
if np.any(mask):
centroids[j] = np.mean(dataSet[mask], axis=0)
return centroids

  • 对每个簇计算均值作为新质心。

5. 主函数:向量化 K-Means

1
2
3
4
5
6
7
8
9
10
11
12
13
def KMeans_vectorized(dataSet, k, max_iter=100):
centroids = randomCenter(dataSet, k)

for iter_num in range(max_iter):
distances = compute_distances(dataSet, centroids)
cluster_indices = assign_clusters(distances)
new_centroids = update_centroids(dataSet, cluster_indices, k)

if np.allclose(centroids, new_centroids):
break
centroids = new_centroids
return centroids, cluster_indices, iter_num

  • 收敛条件:新旧质心几乎相等时停止迭代。
  • 最终返回质心、簇标签和迭代次数。

📊 四、可视化聚类结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
plt.figure(figsize=(12, 8))
colors_list = ['red', 'green', 'blue']
markers_list = ['o', 's', '^']

for i in range(k):
plt.scatter(
dataSet[clusterAssment == i, 1],
dataSet[clusterAssment == i, 3],
c=colors_list[i],
marker=markers_list[i],
label=f'Cluster {i+1}'
)
plt.scatter(
centroids[i, 1],
centroids[i, 3],
c=colors_list[i],
marker='x',
s=100,
label='Centroids'
)

plt.xlabel('Feature 0')
plt.ylabel('Feature 1')
plt.title(f'KMeans Clustering Result in iter num: {iter_num}')
plt.legend()
plt.show()

  • 不同颜色代表不同簇。
  • 不同标记区分簇内点与质心。

K-Means


📌 五、总结

功能 描述
向量化实现 提升聚类速度,适用于大规模数据
随机初始化 简单易懂,但可能陷入局部最优
收敛判断 使用 np.allclose() 避免无限迭代
可视化支持 帮助直观理解聚类效果

📚 六、参考资料

  • [1] Scikit-learn 官方文档:KMeans 聚类介绍
  • [2] KMeans 初始化方法详解:k-means++ vs random
  • [3] NumPy 和 Matplotlib 在机器学习中的应用
  • [4] 向量化计算提高效率的方法

🧠 使用 Python 实现自定义 DBSCAN 聚类算法

DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法,能够发现任意形状的簇,并且能识别出噪声点。本篇博客将带你一步步实现一个高效的 自定义 DBSCAN 算法,并使用极坐标方式生成测试样本数据集进行验证。

📦 一、环境依赖与数据准备

我们使用如下库:

1
2
3
4
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist

✅ 生成测试样本

我们通过极坐标生成两个不同半径范围内的样本点,模拟环形分布的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def generate_sample():
number_of_samples = 1000
# 内部小圆:20% 数据
theta = np.random.uniform(0, 2*np.pi, int(0.2*number_of_samples))
r = np.sqrt(np.random.uniform(0, 4, int(0.2*number_of_samples)))
x = r * np.cos(theta)
y = r * np.sin(theta)
X_1 = np.column_stack((x, y))

# 外部大圆:80% 数据
theta = np.random.uniform(0, 2*np.pi, number_of_samples)
r = np.sqrt(np.random.uniform(25, 36, number_of_samples))
x = r * np.cos(theta)
y = r * np.sin(theta)
X_2 = np.column_stack((x, y))

# 可视化
plt.figure(figsize=(12, 8))
plt.scatter(X_1[:, 0], X_1[:, 1], color='r')
plt.scatter(X_2[:, 0], X_2[:, 1], color='b')
plt.title('Generated Sample Data (Polar Coordinates)')
plt.xlabel('X')
plt.ylabel('Y')
plt.show()

return np.vstack((X_1, X_2))

✅ 该方法参考了随机采样策略,适用于测试和可视化分析。


⚙️ 二、DBSCAN 类设计与实现

1. 初始化参数

1
2
3
4
5
6
7
class DBSCAN:
def __init__(self, eps=3):
self.eps = eps
self.value_threshold = 750
self.labels = None
self.cluster_centers = []

参数 说明
eps 邻域半径,用于判断邻近点
value\_threshold 密度峰值选择阈值

2. 计算距离与密度

1
2
3
distances = cdist(X, X, metric='euclidean')
density = np.sum(distances < self.eps, axis=1)

  • 使用 cdist 一次性计算所有点之间的欧氏距离
  • 每个点的局部密度是其邻域内点的数量

3. 计算最小距离

1
2
3
4
5
6
7
8
9
min_distances = []
for i in range(n_samples):
greater_density_indices = np.where(density > density[i])[0]
if len(greater_density_indices) > 0:
min_distance = np.min(distances[i, greater_density_indices])
min_distances.append(min_distance)
else:
min_distances.append(np.max(distances[i]))

  • 对每个点,找到比它密度高的点,并计算到这些点的最小距离

4. 选取聚类中心

1
2
3
4
5
6
7
8
density_times_min_distance = density * np.array(min_distances)
idx = np.argsort(density_times_min_distance)[::-1]

mask = density_times_min_distance[idx] > self.value_threshold
selected_indices = idx[mask]
self.labels[selected_indices] = selected_indices
self.cluster_centers.extend(selected_indices.tolist())

  • 通过 density × min\_distance 排序,结合阈值筛选聚类中心

5. 广度优先扩展聚类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Have_labeled = []
for label in cluster_centers.keys():
queue_center = cluster_centers[label].copy()
Have_labeled.extend(queue_center)

while queue_center:
current = queue_center.pop(0)
neighbors = np.where(distances[current] < self.eps)[0]
for neighbor in neighbors:
if neighbor not in Have_labeled:
self.labels[neighbor] = self.labels[current]
Have_labeled.append(neighbor)
queue_center.append(neighbor)

  • 使用 BFS 扩展簇,将邻域点标记为相同标签

6. 统一标签编号

1
2
3
4
5
6
final_labels = np.unique(self.labels)
label_map = {label: i for i, label in enumerate(final_labels)}
if -1 in label_map:
label_map[-1] = -1
self.labels = np.array([label_map[label] for label in self.labels])

  • 将原始索引标签映射为连续整数,便于后续处理

📊 三、可视化聚类结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
unique_labels = np.unique(labels)
colors = plt.get_cmap('tab10', len(unique_labels))

plt.figure(figsize=(12, 8))
for idx, label in enumerate(unique_labels):
mask = labels == label
plt.scatter(X[mask, 0], X[mask, 1], s=10, color=colors(idx),
label=f'Cluster {label}' if label != -1 else 'Noise')

plt.title('DBSCAN Clustering Result')
plt.xlabel('X1')
plt.ylabel('X2')
plt.legend()
plt.show()

  • 不同颜色代表不同簇
  • -1 表示噪声点

Raw division
DBSCAN


📌 四、总结与优化建议

功能 描述
极坐标生成样本 更适合环形或球形数据集
向量化操作 提升性能,避免双重循环
支持噪声识别 是 DBSCAN 的核心优势之一
可视化清晰 帮助理解聚类效果

✅ 优化建议:

  • 使用 K-D TreeBall Tree 加速邻域查询
  • 替换固定阈值为动态选择策略
  • 支持自动确定 epsmin\_samples
  • 使用 numbaCython 加速核心循环部分

📚 五、参考资料

  • [1] scipy.spatial.distance.cdist 用法
  • [2] 测试样本生成方法
  • [3] 聚类算法在机器学习中的应用
  • [4] DBSCAN 算法原理详解

🎉 感**谢阅读,希望这篇博客对你理解和实现 DBSCAN 聚类有所帮助!