K-Means Clustering from Scratch in Python

Posted by Kenzo Takahashi on Tue 19 January 2016

K-means is the most popular clustering algorithm. The basic idea is that it places samples in a high dimensional space according to their attributes and groups samples that are close to each other. It's easy to understand because the math used is not complecated.

If you are studying machine learning on Andrew Ng's coursera course but don't like Matlab/Octave, this post is for you. K-means is introduced in week 8 of his course. The lecture notes 7a on Stanford website is also helpful.


Before We Get Started

For this tutorial, I assume you know the followings:

  • Python(list comprehension, basic OOP)
  • Numpy(broadcasting)
  • Basic Linear Algebra
  • Basic machine learning concepts

My code follows the scikit-learn style. If you are unfamiliar with scikit-learn, I recommend you check out the website. I also briefly mention it in my post, K-Nearest Neighbor from Scratch in Python.

I'm using python3. If you want to use python2, add this line at the beginning of your file and everything should work fine.

from __future__ import division

Initial Code

Our k-means class takes 3 parameters: number of clusters, number of iteration, and random state.

import numpy as np

class KMeans(object):
    def __init__(self, n_clusters=8, max_iter=300, random_state=None):
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.random_state = random_state

Exercise 1

The first thing we need to do is initialize clusters by randomly selecting k samples. This is actually not as simple as it might seem. There are a lot of ways to do it:

def fit(self, X):
    if self.random_state:
        np.random.seed(self.random_state)
    # Your code here
    return self

X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1).fit(X)
print(self.cluster_centers)

Our toy example consists of 5 samples with 2 attribues. In this example, n_clusters=2. So it randomly picks 2 samples from X.

[[2 2]
 [1 2]]

Solution

The best way I found is to use np.random.permutation:

initial = np.random.permutation(X.shape[0])[:self.n_clusters]
self.cluster_centers_ = X[initial]

Exercise 2

Before we continue with fit, we need to write a few helper methods. We want to measure the distance between a cluster center and a sample using Euclidean distance:

$$d(p, q) = \sqrt{\displaystyle\sum_{i=1}^{n} (p_i - q_i)^2}$$
def _distance(self, a, b):
    # Your code here

kmeans = KMeans()
print(kmeans._distance(np.array([1,-1]), np.array([3, 3])))

Output:

4.472135955

which is \(\sqrt{(1 - 3)^2 + (-1 -3)^2}\)


Solution

def _distance(self, a, b):
    return np.sqrt(((a - b)**2).sum())

Exercise 3

Next, we are going to write a method to choose the nearest cluster given a sample. Your code should return the index of the nearest cluster, not the cluster itself:

def _nearest(self, clusters, x):
    # Your code

kmeans = KMeans()
print(kmeans._nearest(np.array([[1,2],[2,2]]), np.array([4,5])))

The output is 1 because [4,5] is closer to [2,2] than to [1,2]


Solution

You can use numpy.argmin:

def _nearest(self, clusters, x):
    return np.argmin([self._distance(x, c) for c in clusters])

Iteration

So far our k-means looks like this:

class KMeans(object):
    def __init__(self, n_clusters=8, max_iter=300, random_state=None):
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.random_state = random_state

    def fit(self, X):
        if self.random_state:
            np.random.seed(self.random_state)
        initial = np.random.permutation(X.shape[0])[:self.n_clusters]
        self.cluster_centers_ = X[initial]
        return self

    def _nearest(self, clusters, x):
        return np.argmin([self._distance(x, c) for c in clusters])

    def _distance(self, a, b):
        return np.sqrt(((a - b)**2).sum())

Exercise 4

Let's go back to fit. Inside the for loop is going to be where the actual learning happens. self.labels is a list of indices of cluster centers. Group the samples according to the clusters:

def fit(self, X):
    if self.random_state:
        np.random.seed(self.random_state)
    initial = np.random.permutation(X.shape[0])[:self.n_clusters]
    self.cluster_centers_ = X[initial]
    for _ in range(self.max_iter):
        self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
        # Your code here
        print(self.cluster_centers_)
        print(indices)
        return

X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1)
kmeans.fit(X)

[2,2],[4,5], and [5,4] are closer to the first cluser center [2,2]. So their indices belong to the first list of indices. Similarly, [1,1] and [1,2] are closer to the first cluster center [1 2]:

[[2 2]
 [1 2]]
[[2, 3, 4], [0, 1]]

Solution

My first attempt was this:

indices = [[] for _ in range(self.n_clusters)]
for i, l in enumerate(self.labels_):
    indices[l].append(i)

After that I found another solution using list comprehension:

indices = [[i for i, l in enumerate(self.labels_) if l == j]
            for j in range(self.n_clusters)]

Exercise 5

I added X_by_cluster, which is a list of actual samples grouped by cluster. Now we can update self.cluster_centers_ by averaging these samples:

def fit(self, X):
    if self.random_state:
        np.random.seed(self.random_state)
    initial = np.random.permutation(X.shape[0])[:self.n_clusters]
    self.cluster_centers_ = X[initial]
    for _ in range(self.max_iter):
        self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
        print(self.labels_)
        indices = [[i for i, l in enumerate(self.labels_) if l == j]
                    for j in range(self.n_clusters)]
        X_by_cluster = [X[i] for i in indices]
        self.cluster_centers_  = # Your code here
    return self

X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1)
kmeans.fit(X)
print(self.cluster_centers_)

Output:

[array([ 3.66666667,  3.66666667]), array([ 1. ,  1.5])]

Solution

self.cluster_centers_ = [c.sum(axis=0) / len(c) for c in X_by_cluster]

Exercise 6

When using an optimization algorithm like gradient descent, there is an error that it tries to minimize. While k-means does not explicitly try to minimize some error, we still have a way to measure error. It's called inertia. Inertia is the sum of square distances of samples to their closest cluster center. The smaller the inertia, the better.

def fit(self, X):
    if self.random_state:
        np.random.seed(self.random_state)
    initial = np.random.permutation(X.shape[0])[:self.n_clusters]
    self.cluster_centers_ = X[initial]
    for _ in range(self.max_iter):
        self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
        indices = [[i for i, l in enumerate(self.labels_) if l == j]
                    for j in range(self.n_clusters)]
        X_by_cluster = [X[i] for i in indices]
        # update the clusters
        self.cluster_centers_ = [c.sum(axis=0) / len(c) for c in X_by_cluster]
    # sum of square distances from the closest cluster
    self.inertia_ = # Your code here
    return self

X = np.array([[1,1],[1,2],[2,2],[4,5],[5,4]])
kmeans = KMeans(n_clusters=2, max_iter=5, random_state=1)
kmeans.fit(X)
print(kmeans.inertia_)

Output:

2.33333333333

Solution

self.inertia_ = sum(((self.cluster_centers_[l] - x)**2).sum()
                    for x, l in zip(X, self.labels_))

You can also move the code inside the for loop and print it out after each iteration to make sure your code is working.

As the number of clusters k increases, inertia decreases. Eventually, when k = sample size, inertia is 0 because every cluster center is the sample itself. That does not mean you should just increase k though!

Also the score method in scikit-learn is - inertia:

def score(self, X):
    return -self.inertia_

Here is the complete code:

class KMeans(object):
    def __init__(self, n_clusters=8, max_iter=300, random_state=None):
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.random_state = random_state

    def fit(self, X):
        if self.random_state:
            np.random.seed(self.random_state)
        initial = np.random.permutation(X.shape[0])[:self.n_clusters]
        self.cluster_centers_ = X[initial]

        for _ in range(self.max_iter):
            self.labels_ = [self._nearest(self.cluster_centers_, x) for x in X]
            indices = [[i for i, l in enumerate(self.labels_) if l == j]
                        for j in range(self.n_clusters)]
            X_by_cluster = [X[i] for i in indices]
            # update the clusters
            self.cluster_centers_ = [c.sum(axis=0) / len(c) for c in X_by_cluster]
        # sum of square distances from the closest cluster
        self.inertia_ = sum(((self.cluster_centers_[l] - x)**2).sum()
                            for x, l in zip(X, self.labels_))
        return self

    def _nearest(self, clusters, x):
        return np.argmin([self._distance(x, c) for c in clusters])

    def _distance(self, a, b):
        return np.sqrt(((a - b)**2).sum())

    def predict(self, X):
        return self.labels_

    def transform(self, X):
        return [[self._distance(x, c) for c in self.cluster_centers_] for x in X]

    def fit_predict(self, X):
        return self.fit(X).predict(X)

    def fit_transform(self, X):
        return self.fit(X).transform(X)

    def score(self, X):
        return -self.inertia_

predict just returns self.labels_. transform transforms the original samples into a cluster-distance space. Now each dimension of x is the distance to the cluster center instead of an attribute. fit_predict and fit_transform are just a shortcut.

Conclusion

If you have questions or comments, tweet @kenzotakahashi and I'll be happy to help.