K-means is the most popular clustering algorithm. The basic idea is that it places samples in a high dimensional space according to their attributes and groups samples that are close to each other. It's easy to understand because the math used is not complecated.
If you are studying machine learning on Andrew Ng's coursera course but don't like Matlab/Octave, this post is for you. K-means is introduced in week 8 of his course. The lecture notes 7a on Stanford website is also helpful.
Before We Get Started
For this tutorial, I assume you know the followings:
- Python(list comprehension, basic OOP)
- Numpy(broadcasting)
- Basic Linear Algebra
- Basic machine learning concepts
My code follows the scikit-learn style. If you are unfamiliar with scikit-learn, I recommend you check out the website. I also briefly mention it in my post, K-Nearest Neighbor from Scratch in Python.
I'm using python3. If you want to use python2, add this line at the beginning of your file and everything should work fine.
from __future__ import division
Initial Code
Our k-means class takes 3 parameters: number of clusters, number of iteration, and random state.
import numpy as np
class KMeans(object):
def __init__(self, n_clusters=8, max_iter=300, random_state=None):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.random_state = random_state
Exercise 1
The first thing we need to do is initialize clusters by randomly selecting k samples. This is actually not as simple as it might seem. There are a lot of ways to do it:
def fit(self, X):
if self.random_state:
np.random.seed(self.random_state)
# Your code here
return self
X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1).fit(X)
print(self.cluster_centers)
Our toy example consists of 5 samples with 2 attribues. In this example, n_clusters=2
. So it randomly picks 2 samples from X.
[[2 2]
[1 2]]
Solution
The best way I found is to use np.random.permutation
:
initial = np.random.permutation(X.shape[0])[:self.n_clusters]
self.cluster_centers_ = X[initial]
Exercise 2
Before we continue with fit
, we need to write a few helper methods. We want to measure the distance between a cluster center and a sample using Euclidean distance:
def _distance(self, a, b):
# Your code here
kmeans = KMeans()
print(kmeans._distance(np.array([1,-1]), np.array([3, 3])))
Output:
4.472135955
which is \(\sqrt{(1 - 3)^2 + (-1 -3)^2}\)
Solution
def _distance(self, a, b):
return np.sqrt(((a - b)**2).sum())
Exercise 3
Next, we are going to write a method to choose the nearest cluster given a sample. Your code should return the index of the nearest cluster, not the cluster itself:
def _nearest(self, clusters, x):
# Your code
kmeans = KMeans()
print(kmeans._nearest(np.array([[1,2],[2,2]]), np.array([4,5])))
The output is 1 because [4,5]
is closer to [2,2]
than to [1,2]
Solution
You can use numpy.argmin
:
def _nearest(self, clusters, x):
return np.argmin([self._distance(x, c) for c in clusters])
Iteration
So far our k-means looks like this:
class KMeans(object):
def __init__(self, n_clusters=8, max_iter=300, random_state=None):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.random_state = random_state
def fit(self, X):
if self.random_state:
np.random.seed(self.random_state)
initial = np.random.permutation(X.shape[0])[:self.n_clusters]
self.cluster_centers_ = X[initial]
return self
def _nearest(self, clusters, x):
return np.argmin([self._distance(x, c) for c in clusters])
def _distance(self, a, b):
return np.sqrt(((a - b)**2).sum())
Exercise 4
Let's go back to fit
. Inside the for loop is going to be where the actual learning happens. self.labels
is a list of indices of cluster centers. Group the samples according to the clusters:
def fit(self, X):
if self.random_state:
np.random.seed(self.random_state)
initial = np.random.permutation(X.shape[0])[:self.n_clusters]
self.cluster_centers_ = X[initial]
for _ in range(self.max_iter):
self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
# Your code here
print(self.cluster_centers_)
print(indices)
return
X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1)
kmeans.fit(X)
[2,2]
,[4,5]
, and [5,4]
are closer to the first cluser center [2,2]
. So their indices belong to the first list of indices
. Similarly, [1,1]
and [1,2]
are closer to the first cluster center [1 2]
:
[[2 2]
[1 2]]
[[2, 3, 4], [0, 1]]
Solution
My first attempt was this:
indices = [[] for _ in range(self.n_clusters)]
for i, l in enumerate(self.labels_):
indices[l].append(i)
After that I found another solution using list comprehension:
indices = [[i for i, l in enumerate(self.labels_) if l == j]
for j in range(self.n_clusters)]
Exercise 5
I added X_by_cluster
, which is a list of actual samples grouped by cluster. Now we can update self.cluster_centers_
by averaging these samples:
def fit(self, X):
if self.random_state:
np.random.seed(self.random_state)
initial = np.random.permutation(X.shape[0])[:self.n_clusters]
self.cluster_centers_ = X[initial]
for _ in range(self.max_iter):
self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
print(self.labels_)
indices = [[i for i, l in enumerate(self.labels_) if l == j]
for j in range(self.n_clusters)]
X_by_cluster = [X[i] for i in indices]
self.cluster_centers_ = # Your code here
return self
X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1)
kmeans.fit(X)
print(self.cluster_centers_)
Output:
[array([ 3.66666667, 3.66666667]), array([ 1. , 1.5])]
Solution
self.cluster_centers_ = [c.sum(axis=0) / len(c) for c in X_by_cluster]
Exercise 6
When using an optimization algorithm like gradient descent, there is an error that it tries to minimize. While k-means does not explicitly try to minimize some error, we still have a way to measure error. It's called inertia. Inertia is the sum of square distances of samples to their closest cluster center. The smaller the inertia, the better.
def fit(self, X):
if self.random_state:
np.random.seed(self.random_state)
initial = np.random.permutation(X.shape[0])[:self.n_clusters]
self.cluster_centers_ = X[initial]
for _ in range(self.max_iter):
self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
indices = [[i for i, l in enumerate(self.labels_) if l == j]
for j in range(self.n_clusters)]
X_by_cluster = [X[i] for i in indices]
# update the clusters
self.cluster_centers_ = [c.sum(axis=0) / len(c) for c in X_by_cluster]
# sum of square distances from the closest cluster
self.inertia_ = # Your code here
return self
X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1)
kmeans.fit(X)
print(kmeans.inertia_)
Output:
2.33333333333
Solution
self.inertia_ = sum(((self.cluster_centers_[l] - x)**2).sum()
for x, l in zip(X, self.labels_))
You can also move the code inside the for loop and print it out after each iteration to make sure your code is working.
As the number of clusters k increases, inertia decreases. Eventually, when k = sample size, inertia is 0 because every cluster center is the sample itself. That does not mean you should just increase k though!
Also the score method in scikit-learn is - inertia:
def score(self, X):
return -self.inertia_
Here is the complete code:
class KMeans(object):
def __init__(self, n_clusters=8, max_iter=300, random_state=None):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.random_state = random_state
def fit(self, X):
if self.random_state:
np.random.seed(self.random_state)
initial = np.random.permutation(X.shape[0])[:self.n_clusters]
self.cluster_centers_ = X[initial]
for _ in range(self.max_iter):
self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
indices = [[i for i, l in enumerate(self.labels_) if l == j]
for j in range(self.n_clusters)]
X_by_cluster = [X[i] for i in indices]
# update the clusters
self.cluster_centers_ = [c.sum(axis=0) / len(c) for c in X_by_cluster]
# sum of square distances from the closest cluster
self.inertia_ = sum(((self.cluster_centers_[l] - x)**2).sum()
for x, l in zip(X, self.labels_))
return self
def _nearest(self, clusters, x):
return np.argmin([self._distance(x, c) for c in clusters])
def _distance(self, a, b):
return np.sqrt(((a - b)**2).sum())
def predict(self, X):
return self.labels_
def transform(self, X):
return [[self._distance(x, c) for c in self.cluster_centers_] for x in X]
def fit_predict(self, X):
return self.fit(X).predict(X)
def fit_transform(self, X):
return self.fit(X).transform(X)
def score(self, X):
return -self.inertia_
predict
just returns self.labels_
. transform
transforms the original samples into a cluster-distance space. Now each dimension of x is the distance to the cluster center instead of an attribute. fit_predict
and fit_transform
are just a shortcut.
Conclusion
If you have questions or comments, tweet @kenzotakahashi and I'll be happy to help.