DEV Community

Cover image for Deep Learning Library From Scratch 5: Automatic Differentiation Continued
ashwins-code
ashwins-code

Posted on • Updated on

Deep Learning Library From Scratch 5: Automatic Differentiation Continued

Hi Guys! Welcome to part 5 of this series of building a deep learning library from scratch. This post will cover the code of the automatic differentiation part of the library. Automatic Differentiation was discussed in the previous post, so do check it out if you don't know what Autodiff is.

The github repo for this series is....

GitHub logo ashwins-code / Zen-Deep-Learning-Library

Deep Learning library written in Python. Contains code for my blog series on building a deep learning library.

Zen - Deep Learning Library

A deep learning library written in Python.

Contains the code for my blog series where we build this library from scratch

  • mnist.py contains an example for a MNIST digit recogniser using the library
  • rnn.py contains an example of a Recurrent Neural Network which learns to fit to the graph sin(x) * cos(x)



Approach

Automatic Differentiation relies on a computation graph to calculate derivatives.

Ultimately, it just boils down to nodes with some connections and we traverse these nodes in some way to calculate derivatives.

For our library, we will build our computation graph on the fly, meaning any calculations performed would be recorded onto the computation graph.

Once we have the graph, we need to find a way to use it to calculate the derivatives of all the variables in that graph.

For example, say we produced the following graph...

Image description

This represents...

c=a+be=cdc = a + b \newline e = c * d \newline

Now, using the graph, our aim is to find the derivative of e with respect all the variables in that graph (a,b,c,d,e)

For my implementation, I found it easiest to traverse the graph in a depth-first manner to calculate derivatives.

So firstly, we start at e and find dede\frac{de}{de} with respect to e (which is just 1).

Then, we look at node c, meaning we now need to calculate dedc\frac{de}{dc} . We can see that ee is the result of a multiplication between cc and dd , meaning dedc=d\frac{de}{dc} = d (since we treat everything apart from the variable we are on as a constant).

Remembering we are traversing depth-first, the next node we move onto is node a, meaning we calculate deda\frac{de}{da} . This is a bit more tricky, since a does not have a direct connection to e. However, using the chain rule, we know that deda=dedcdcda\frac{de}{da} = \frac{de}{dc}\frac{dc}{da} . We just calculated dedc\frac{de}{dc} , so all we need to calculate now is dcda\frac{dc}{da} . We can see that c is an addition of a and b, so deda=dedcdcda=d\frac{de}{da} = \frac{de}{dc}\frac{dc}{da} = d

Hopefully, you can now see how we would use a graph to find the derivatives of all the variables in that graph.

Tensor Class

Firstly, we need to create our tensor class, which would act as the variable nodes on our graph.

import numpy as np
import string
import random

def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


np.seterr(invalid='ignore')

def is_matrix(o):
    return type(o) == np.ndarray

def same_shape(s1, s2):
    for a, b in zip(s1, s2):
        if a != b:
            return False

    return True

class Tensor:
    __array_priority__ = 1000
    def __init__(self, value, trainable=True):
        self.value = value
        self.dependencies = []
        self.grads = []
        self.grad_value = None
        self.shape = 0
        self.matmul_product = False
        self.gradient = 0
        self.trainable = trainable
        self.id = id_generator()

        if is_matrix(value):
            self.shape = value.shape
Enter fullscreen mode Exit fullscreen mode

What's going on here?

def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))
Enter fullscreen mode Exit fullscreen mode

Function that generates a unique id using random characters

def is_matrix(o):
    return type(o) == np.ndarray
Enter fullscreen mode Exit fullscreen mode

Simple function which checks if a value is a numpy array or not.

self.value = value
Enter fullscreen mode Exit fullscreen mode

This line should be self-explanatory, it just holds the value that is given to the tensor.

self.dependencies = [] 
Enter fullscreen mode Exit fullscreen mode

If the tensor was the result of any operation e.g add or divide, this property would hold the list of tensors that were involved in the operation, producing this tensor (this is how the computation graph is built). If the tensor is not a result of any operation, then this is empty.

self.grads = []
Enter fullscreen mode Exit fullscreen mode

This property would hold the list of derivative of each of the tensor's dependencies with respect to the tensor.

self.shape = 0
...
if is_matrix(value):
            self.shape = value.shape
Enter fullscreen mode Exit fullscreen mode

self.shape holds the shape of the tensor's value. Only numpy arrays can have a shape, which is why it is 0 by default.

self.matmul_product = False
Enter fullscreen mode Exit fullscreen mode

Specifies whether the tensor was a result of a matrix multiplicaton or not (this will help later since the chain rule works differently for matrix multiplacation).

self.gradient = np.ones_like(self.value)
Enter fullscreen mode Exit fullscreen mode

After we use the computation graph to calculate gradients, this property would hold the gradient calculated for the tensor. It is initially set to a matrix of 1s, the same shape as its value.

self.trainable = trainable
Enter fullscreen mode Exit fullscreen mode

Some nodes on the graph do not need their derivatives to be calculated, so this property specifies whether this is the case or not for this tensor.

self.id = id_generator()
Enter fullscreen mode Exit fullscreen mode

Tensors will need to have something to uniquely identify themselves. We will see this coming in use when we reimplement our optimisers to use this automatic differentiation module in a later post.

Operations with Tensors

class Tensor:
    __array_priority__ = 1000
    def __init__(self, value, trainable=True):
        self.value = value
        self.dependencies = []
        self.grads = []
        self.grad_value = None
        self.shape = 0
        self.matmul_product = False
        self.gradient = 0
        self.trainable = trainable
        self.id = id_generator()

        if is_matrix(value):
            self.shape = value.shape

    def depends_on(self, target):
        if self == target:
            return True

        dependencies = self.dependencies

        for dependency in dependencies:
            if dependency == target:
                return True
            elif dependency.depends_on(target):
                return True

        return False

    def __mul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value * other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(other.value)
        var.grads.append(self.value)
        return var

    def __rmul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value * other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(other.value)
        var.grads.append(self.value)
        return var

    def __add__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value + other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(np.ones_like(self.value))
        var.grads.append(np.ones_like(other.value))
        return var

    def __radd__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value + other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(np.ones_like(self.value))
        var.grads.append(np.ones_like(other.value))
        return var

    def __sub__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other)

        var = Tensor(self.value - other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(np.ones_like(self.value))
        var.grads.append(-np.ones_like(other.value))
        return var

    def __rsub__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(other.value - self.value)
        var.dependencies.append(other)
        var.dependencies.append(self)
        var.grads.append(np.ones_like(other.value))
        var.grads.append(-np.one_like(self.value))
        return var

    def __pow__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value ** other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)

        grad_wrt_self = other.value * self.value ** (other.value - 1)
        var.grads.append(grad_wrt_self)

        grad_wrt_other = (self.value ** other.value) * np.log(self.value)
        var.grads.append(grad_wrt_other)

        return var

    def __rpow__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(other.value ** self.value)
        var.dependencies.append(other)
        var.dependencies.append(self)

        grad_wrt_other = self.value * other.value ** (self.value - 1)
        var.grads.append(grad_wrt_other)

        grad_wrt_self = (other.value ** self.value) * np.log(other.value)
        var.grads.append(grad_wrt_self)

        return var


    def __truediv__(self, other):
        return self * (other ** -1)

    def __rtruediv__(self, other):
        return other * (self ** -1)

    def __matmul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value @ other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(other.value.T)
        var.grads.append(self.value.T)

        var.matmul_product = True
        return var

    def __rmatmul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(other.value @ self.value)
        var.dependencies.append(other)
        var.dependencies.append(self)
        var.grads.append(self.value.T)
        var.grads.append(other.value.T)

        var.matmul_product = True

        return var
Enter fullscreen mode Exit fullscreen mode

To understand what is happening here, let's look at one of the methods

def __mul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value * other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(other.value)
        var.grads.append(self.value)
        return var
Enter fullscreen mode Exit fullscreen mode

Firstly, __mul__ is an operator overloader. This just means that whenever we want to multiply a tensor with something else, this method would be called. You also see __rmul__, which is the same thing, but is called when the Tensor object is on the right hand side of the operation.

For example...

t = Tensor(10)
t * 5 #__mul__ is called
5 * t #__rmul__ is called
Enter fullscreen mode Exit fullscreen mode
if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)
Enter fullscreen mode Exit fullscreen mode

other represents the thing that this tensor is being multiplied with. If other is not a tensor, we want to convert it to a tensor, holding the value of other. Since other was not already a tensor, it means that it is a constant, so we do not need to calculate its derivative, since we only want to calculate the derivative of something if we need to change its value, usually when training a model. This is why trainable=False.

var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
Enter fullscreen mode Exit fullscreen mode

var holds the resulting tensor of this operation. The second and third lines add the two tensors used in this operator to var's dependencies (look back up if you forgot what the dependencies property is used for)

var.grads.append(other.value) # dvar/dother
var.grads.append(self.value) # dvar/dself
return var
Enter fullscreen mode Exit fullscreen mode

This now adds the derivates of the two operands with respect to var to var's grads. These lines would obviously be different in the other class methods, since the derivatives depend on the operation being applied (in this case it's multiplication). Note how the order of how they're added corresponds with the order of the tensors in the dependencies property. We don't store them as tensors, since it will be much quicker to calculate derivatives using raw values, instead of our tensor class.

Calculating gradients using the graph!

def get_gradients(self, grad = None):
        grad = np.ones_like(self.value) if grad is None else grad
        grad = np.float32(grad)

        for dependency, _grad in zip(self.dependencies, self.grads):
            if dependency.trainable:
                local_grad = np.float32(_grad)

                if self.matmul_product:                
                    if dependency == self.dependencies[0]:
                        local_grad = grad @ local_grad
                    else:
                        local_grad = local_grad @ grad
                else:
                    if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
                        ndims_added = grad.ndim - local_grad.ndim
                        for _ in range(ndims_added):
                            grad = grad.sum(axis=0)

                        for i, dim in enumerate(dependency.shape):
                            if dim == 1:
                                grad = grad.sum(axis=i, keepdims=True)

                    local_grad = local_grad * np.nan_to_num(grad)


                dependency.gradient += local_grad
                dependency.get_gradients(local_grad)
Enter fullscreen mode Exit fullscreen mode

This method recursively implements the depth-first derivative calculating that was outlined earlier. This can be called for any tensor in the graph, not just the top one. It would result in all the derivatives of all the tensors, that were used to produce this tensor, being calculated.

grad holds the incoming gradients/the previously calculated gradient in the previous "level" of the graph.

for dependency, _grad in zip(self.dependencies, self.grads):
        if dependency.trainable:
           local_grad = np.float32(_grad)

            if self.matmul_product:                
                if dependency == self.dependencies[0]:
                    local_grad = grad @ local_grad
                else:
                    local_grad = local_grad @ grad
            else:
                if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
                    ndims_added = grad.ndim - local_grad.ndim
                    for _ in range(ndims_added):
                        grad = grad.sum(axis=0)

                    for i, dim in enumerate(dependency.shape):
                        if dim == 1:
                            grad = grad.sum(axis=i, keepdims=True)

                    local_grad = local_grad * np.nan_to_num(grad)
Enter fullscreen mode Exit fullscreen mode

This part of the code goes through each of the tensor's dependencies along with the derivative the tensor with respect to that dependency (held in _grad). If the dependency is trainable, it then checks if it was a result of a matrix multiplication or not.

The chain rule is then accordingly applied, using the previously calculated gradient grad and the gradient of the current dependency _grad. local_grad stores the result after the chain rule is applied.

def same_shape(s1, s2):
    for a, b in zip(s1, s2):
        if a != b:
            return False

    return True
Enter fullscreen mode Exit fullscreen mode
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
                ndims_added = grad.ndim - local_grad.ndim
                for _ in range(ndims_added):
                    grad = grad.sum(axis=0)

                for i, dim in enumerate(dependency.shape):
                    if dim == 1:
                        grad = grad.sum(axis=i, keepdims=True)
Enter fullscreen mode Exit fullscreen mode

If we focus on this part of the code, this handles any case where local_grad and grad aren't the same shape (they need to be in order for chain rule to be applied). This shape mismatch arises if there was any broadcasting performed in any of the calculations. Broadcasting is term used to describe how numpy would perform operations involving arrays of different shapes. You can read more about it on the numpy docs. All this part of the code does is sum across the broadcasted axis of grad, in order to reduce its shape to match the shape of local_grad.

dependency.gradient += local_grad
dependency.get_gradients(local_grad)
Enter fullscreen mode Exit fullscreen mode

The gradient is then recorded to the gradient property of the dependency. It then continues the depth-first traversal by calling the get_gradients method on the dependency, passing through the gradient that was just calculated.

Overall, our code for autodiff should look like this...

import numpy as np
import string
import random

def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


np.seterr(invalid='ignore')

def is_matrix(o):
    return type(o) == np.ndarray

def same_shape(s1, s2):
    for a, b in zip(s1, s2):
        if a != b:
            return False

    return True

class Tensor:
    __array_priority__ = 1000
    def __init__(self, value, trainable=True):
        self.value = value
        self.dependencies = []
        self.grads = []
        self.grad_value = None
        self.shape = 0
        self.matmul_product = False
        self.gradient = 0
        self.trainable = trainable
        self.id = id_generator()

        if is_matrix(value):
            self.shape = value.shape

    def depends_on(self, target):
        if self == target:
            return True

        dependencies = self.dependencies

        for dependency in dependencies:
            if dependency == target:
                return True
            elif dependency.depends_on(target):
                return True

        return False

    def __mul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value * other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(other.value)
        var.grads.append(self.value)
        return var

    def __rmul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value * other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(other.value)
        var.grads.append(self.value)
        return var

    def __add__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value + other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(np.ones_like(self.value))
        var.grads.append(np.ones_like(other.value))
        return var

    def __radd__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value + other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(np.ones_like(self.value))
        var.grads.append(np.ones_like(other.value))
        return var

    def __sub__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other)

        var = Tensor(self.value - other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(np.ones_like(self.value))
        var.grads.append(-np.ones_like(other.value))
        return var

    def __rsub__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(other.value - self.value)
        var.dependencies.append(other)
        var.dependencies.append(self)
        var.grads.append(np.ones_like(other.value))
        var.grads.append(-np.one_like(self.value))
        return var

    def __pow__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value ** other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)

        grad_wrt_self = other.value * self.value ** (other.value - 1)
        var.grads.append(grad_wrt_self)

        grad_wrt_other = (self.value ** other.value) * np.log(self.value)
        var.grads.append(grad_wrt_other)

        return var

    def __rpow__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(other.value ** self.value)
        var.dependencies.append(other)
        var.dependencies.append(self)

        grad_wrt_other = self.value * other.value ** (self.value - 1)
        var.grads.append(grad_wrt_other)

        grad_wrt_self = (other.value ** self.value) * np.log(other.value)
        var.grads.append(grad_wrt_self)

        return var


    def __truediv__(self, other):
        return self * (other ** -1)

    def __rtruediv__(self, other):
        return other * (self ** -1)

    def __matmul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(self.value @ other.value)
        var.dependencies.append(self)
        var.dependencies.append(other)
        var.grads.append(other.value.T)
        var.grads.append(self.value.T)

        var.matmul_product = True
        return var

    def __rmatmul__(self, other):
        if not (isinstance(other, Tensor)):
            other = Tensor(other, trainable=False)

        var = Tensor(other.value @ self.value)
        var.dependencies.append(other)
        var.dependencies.append(self)
        var.grads.append(self.value.T)
        var.grads.append(other.value.T)

        var.matmul_product = True

        return var

    def grad(self, target, grad = None):
        grad = self.value / self.value if grad is None else grad
        grad = np.float32(grad)

        if not self.depends_on(target):
            return 0

        if self == target:
            return grad

        final_grad = 0

        for dependency, _grad in zip(self.dependencies, self.grads):
            local_grad = np.float32(_grad) if dependency.depends_on(target) else 0

            if local_grad is not 0:
                if self.matmul_product:                
                    if dependency == self.dependencies[0]:
                        local_grad = grad @ local_grad
                    else:
                        local_grad = local_grad @ grad
                else:
                    if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
                        ndims_added = grad.ndim - local_grad.ndim
                        for _ in range(ndims_added):
                            grad = grad.sum(axis=0)

                        for i, dim in enumerate(local_grad.shape):
                            if dim == 1:
                                grad = grad.sum(axis=i, keepdims=True)

                    local_grad *= grad

            final_grad += dependency.grad(target, local_grad)

        return final_grad


    def get_gradients(self, grad = None):
        grad = np.ones_like(self.value) if grad is None else grad
        grad = np.float32(grad)

        for dependency, _grad in zip(self.dependencies, self.grads):
            if dependency.trainable:
                local_grad = np.float32(_grad)

                if self.matmul_product:                
                    if dependency == self.dependencies[0]:
                        local_grad = grad @ local_grad
                    else:
                        local_grad = local_grad @ grad
                else:
                    if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
                        ndims_added = grad.ndim - local_grad.ndim
                        for _ in range(ndims_added):
                            grad = grad.sum(axis=0)

                        for i, dim in enumerate(dependency.shape):
                            if dim == 1:
                                grad = grad.sum(axis=i, keepdims=True)

                    local_grad = local_grad * np.nan_to_num(grad)


                dependency.gradient += local_grad
                dependency.get_gradients(local_grad)

    def __repr__(self):
        return f"Tensor ({self.value})"
Enter fullscreen mode Exit fullscreen mode

and it can be used like so...

a = Tensor(10)
b = Tensor(5)
c = 2
d = (a*b)**c
d.get_gradients()

print (a.gradient, b.gradient)
Enter fullscreen mode Exit fullscreen mode
OUTPUT:
500.0 1000.0
Enter fullscreen mode Exit fullscreen mode

Thank you

Thank you for reading through all of this post! The code of this post can be seen in the Github repo linked at the start in autodiff.py.

Top comments (0)