DEV Community

Julian
Julian

Posted on • Updated on

Network Automation with Python and Paramiko

In this tutorial I'll walk you through how you can automate tasks in your network devices through SSH using Python and the Paramiko library. Paramiko is a Python implementation for SSH that allow us to connect to devices and execute commands on them, saving us time and reducing human errors when performing tasks.

In this particular example, we'll configure DHCP snooping in every switch in a network.

DHCP Snooping

DHCP snooping is a layer 2 security protocol that drops DHCP traffic on unauthorized interfaces, protecting the network from rogue DHCP servers. While protecting the network from rogue DHCP servers, you need to be able to accept the authorized DHCP servers too, so you need to specify the trusted interfaces where the authorized traffic is coming from. The trusted interfaces usually are the Uplinks of every switch, but not every switch in the network has the interfaces correctly tagged or identified so, how do we know what port is the uplink in a switch? The answer resides in ARP Protocol.

Scenario

Lets have a look at this diagram:

Image description

In almost every network (at least on all of the networks I've worked with) the core of the network is the gateway of everything (servers, network devices, VoIP devices etc...) So it means that if I want to reach the internet or any other network, the traffic will flow through the network devices' uplink interfaces. Knowing this, and knowing the IP of the default gateway, we can determine our uplink interfaces using the ARP Protocol.

We'll use the ARP protocol to find the MAC Address of the default gateway and then, you'll find the port where that MAC address is learned from and that port is your Uplink. Some network devices provide the port information in the ARP table and you can save time on the lookup in the MAC table.

Let's have a look at the diagram once again:

Image description

We now have identified our Uplink interfaces and also, we have a not authorized Wireless router with DHCP activated connected to our network (a very common case) that some user brought from home to use connect some wireless device to the corporate network.

This device will inject DHCP in our network devices causing corporate devices to get IPs from the wireless router's network instead of the corporate one. To prevent this from happening, we just need to activate DHCP snooping and trust the uplink interfaces in every switch so the authorized DHCP server can still assign IPs to corporate network devices.

Code

From the previous section, we know what we need in order to configure DHCP snooping correctly in every network device:

  1. It's Uplink port: Using the ARP table and MAC table (or just the ARP table in some network devices)
  2. Activate DHCP Snooping: Using the devices CLI commands (consult your network device manual for that)

To get the Uplink port and configure DHCP snooping, I followed the following steps:

  1. Connect to the device
  2. Get the MAC of the default gateway from the ARP table
  3. Get the port where the MAC of the default gateway is learned from using the MAC table
  4. Execute the command to activate DHCP Snooping and set the Uplink as a trusted interface.

To do that, I created the following functions to make the code easier to create:

import paramiko
import time
import re

'''
This function creates a SSH connection to a Device.
host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}
'''


def create_connection(host):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(**host, look_for_keys=False, allow_agent=False)
    cli = ssh.invoke_shell()
    return ssh, cli


'''This function reads the output of the cli of a connection
I created this for the sole purpose of having a shorter way
to write the read command on the rest of the code.
'''


def read_output(cli):
    message = cli.recv(1000000000000000000000000).decode('utf-8')
    return message


'''
This function makes a lookup on the MAC Address Table
for the port where the provided mac address is learned from.

host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}

You need to know how to find a mac addres in the target device.

The MAC address needs to be provided in the same format the switch supports.

The search command should be written as you would do on the console just without the MAC.

The port_regexp is a regluar expression that needs to match the names of all the possible interface
names on the switch.
'''


def get_port_mac(host, mac, search_command, port_regexp):
    try:
        ssh, cli = create_connection(host)
        cli.send(f'{search_command} {mac}\n')
        time.sleep(5)
        message = read_output(cli)
        port = re.findall(port_regexp, message, flags=re.IGNORECASE)
        ssh.close()
    except IndexError:
        print(host['hostname'] + " not available\n")
    return port[0]


'''
This function helps us to get the uplink of a network device using 
the gateway IP and ARP for this.

host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}

The arp_search_command should be the search command you type o the device console when
searching for an ARP record, just without the IP.

The mac_search_command should be written as you would do on the console just without the MAC.

The port_regexp is a regluar expression that needs to match the names of all the possible interface
names on the switch.

What happens here is basically we add 1 command to the search and reuse the get_mac_port function
'''


def get_uplink(host, gateway, arp_search_command, mac_search_command, port_regexp):
    try:
        ssh, cli = create_connection(host)
        cli.send(f'{arp_search_command} {gateway}\n')
        time.sleep(5)
        arp = read_output(cli)
        mac = re.findall('(?:[0-9A-Fa-f]{2}[:-]){5}(?:[0-9A-Fa-f]{2})', arp)
        ssh.close()
        port = get_port_mac(host, mac[0], mac_search_command, port_regexp)
    except IndexError:
        print(host['hostname'] + " not available\n")
    return port


'''
This function executes a single command directly on the target device and returns
the console output.

host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}

Command should be a string
'''


def execute_command(host, command):
    try:
        ssh, cli = create_connection(host)
        cli.send(command + "\n")
        time.sleep(5)
        message = read_output(cli)
        ssh.close()
    except Exception as e:
        print(f"Error at: {host} ------> {e}".format(hostname=host, error=e))
    return message


'''
This function executes a list of commands on the target device.

host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}

commands should be a list of strings
'''


def execute_bulk_commands(host, commands):
    try:
        ssh, cli = create_connection(host)
        for command in commands:
            cli.send(command + "\n")
            time.sleep(2)
        ssh.close()
    except Exception as e:
        print(f"Error at: {host} ------> {e}".format(hostname=host, error=e))

Enter fullscreen mode Exit fullscreen mode

Note: you need to know very well your network device's CLI so you can elaborate the regexp patterns and the search commands.

With this functions, I created a script to configure the devices on my network. In my case, I have two different kind of switches in my network. I have some FortiSwitches and Cisco SF300 so my regexp patterns and show commands probably will be different to what you'll need.

The code that resulted from my needs is the following:

import threading
import net_tools

'''
Creating all of the parameter for each device brand
in my case I had these but they could be different in your case.
'''

cisco_switches = ["cisco_switch_01", "cisco_switch_02"]
cisco_switches_arp_search = "show arp ip-address "
cisco_switches_mac_search = "show mac address-table address "
cisco_switches_port_regexp = "gi[0123456789].{1,5}|fa[0123456789].{1,5}"

fortiswitches = ["fortiswitch_01", "fotiswitch_02"]
fortiswitches_arp_search = "diagnose ip arp list | grep "
fortiswitches_mac_search = "diagnose switch mac-address list | grep "
fortiswitches_port_regexp = "port[0-9]{1,2}"

# Creating the connection information. We´ll substitute the hostname later

connection_info = {'hostname': "",
                   'port': '22',
                   'username': 'username',
                   'password': 'password'}

gateway = "172.20.90.1"

cisco_switches_threads = list()
for hostname in cisco_switches:
    # We substitute the value of the hostname in the dictionary to match the target device
    # and we create a thread for each switch so the commands can be executed faster using multi-threading
    connection_info['hostname'] = hostname
    uplink_port = net_tools.get_uplink(connection_info, gateway, cisco_switches_arp_search,
                                       cisco_switches_mac_search, cisco_switches_port_regexp)
    cisco_switches_commands = ["enable",
                               "configure terminal",
                               "ip dhcp snooping",
                               "ip dhcp snooping database",
                               "ip dhcp snooping vlan 1",
                               f"interface {uplink_port}",
                               "ip dhcp snooping trust",
                               "end",
                               "wr",
                               "Y"]
    th = threading.Thread(target=net_tools.execute_bulk_commands, args=(connection_info, cisco_switches_commands))
    cisco_switches_threads.append(th)

fortiswitches_threads = list()
for hostname in fortiswitches:
    # We substitute the value of the hostname in the dictionary to match the target device
    # and we create a thread for each switch so the commands can be executed faster using multi-threading
    connection_info['hostname'] = hostname
    uplink_port = net_tools.get_uplink(connection_info, gateway, fortiswitches_arp_search,
                                       fortiswitches_mac_search, fortiswitches_port_regexp)
    fortiswitches_commands = ["config switch vlan",
                              "edit 1",
                              "set dhcp-snooping enable",
                              "end",
                              "config switch interface",
                              f"edit {uplink_port}",
                              "set dhcp-snooping trusted",
                              "end"]
    th = threading.Thread(target=net_tools.execute_bulk_commands, args=(connection_info, fortiswitches_commands))
    fortiswitches_threads.append(th)


# We start our threads
for thread in cisco_switches_threads:
    thread.start()

for thread in fortiswitches_threads:
    thread.start()

# We wait for all of them to finish
for thread in cisco_switches_threads:
    th.join()

for thread in fortiswitches_threads:
    th.join()
Enter fullscreen mode Exit fullscreen mode

You can find the files in my GitHub repository if you want to do something else on top of that, basically all you need to know is how to do it manually in the target devices and automate it!.

If you have any doubts, contact me ans I'll answer you as soon as possible.

Hope this post help some fellow network engineer with difficulties!

Top comments (2)

Collapse
 
fayenuwoayodele profile image
FAYENUWO AYODELE MOSES

What of using python with netmiko.
I think netmiko is more easier to use.
What's your opinion.

Collapse
 
julianalmanzar profile image
Julian

I agree with you, in fact, Netmiko is an implementation of Paramiko optimized for network devices. It would be better at the end but you'll get the same results and the code complexity won't change a lot.