"""NAPALM Cisco IOS Handler.""" # Copyright 2015 Spotify AB. All rights reserved. # # The contents of this file are licensed under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with the # License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. from __future__ import print_function from __future__ import unicode_literals import re import os import uuid import socket import tempfile import telnetlib import copy from netmiko import ConnectHandler, FileTransfer, InLineTransfer from napalm_base.base import NetworkDriver from napalm_base.exceptions import ReplaceConfigException, MergeConfigException, \ ConnectionClosedException, CommandErrorException from napalm_base.utils import py23_compat import napalm_base.constants as C import napalm_base.helpers # Easier to store these as constants HOUR_SECONDS = 3600 DAY_SECONDS = 24 * HOUR_SECONDS WEEK_SECONDS = 7 * DAY_SECONDS YEAR_SECONDS = 365 * DAY_SECONDS # STD REGEX PATTERNS IP_ADDR_REGEX = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" IPV4_ADDR_REGEX = IP_ADDR_REGEX IPV6_ADDR_REGEX_1 = r"::" IPV6_ADDR_REGEX_2 = r"[0-9a-fA-F:]{1,39}::[0-9a-fA-F:]{1,39}" IPV6_ADDR_REGEX_3 = r"[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:" \ "[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}" # Should validate IPv6 address using an IP address library after matching with this regex IPV6_ADDR_REGEX = "(?:{}|{}|{})".format(IPV6_ADDR_REGEX_1, IPV6_ADDR_REGEX_2, IPV6_ADDR_REGEX_3) MAC_REGEX = r"[a-fA-F0-9]{4}\.[a-fA-F0-9]{4}\.[a-fA-F0-9]{4}" VLAN_REGEX = r"\d{1,4}" RE_IPADDR = re.compile(r"{}".format(IP_ADDR_REGEX)) RE_IPADDR_STRIP = re.compile(r"({})\n".format(IP_ADDR_REGEX)) RE_MAC = re.compile(r"{}".format(MAC_REGEX)) # Period needed for 32-bit AS Numbers ASN_REGEX = r"[\d\.]+" IOS_COMMANDS = { 'show_mac_address': ['show mac-address-table', 'show mac address-table'], } class IOSDriver(NetworkDriver): """NAPALM Cisco IOS Handler.""" def __init__(self, hostname, username, password, timeout=60, optional_args=None): """NAPALM Cisco IOS Handler.""" if optional_args is None: optional_args = {} self.hostname = hostname self.username = username self.password = password self.timeout = timeout self.transport = optional_args.get('transport', 'ssh') # Retrieve file names self.candidate_cfg = optional_args.get('candidate_cfg', 'candidate_config.txt') self.merge_cfg = optional_args.get('merge_cfg', 'merge_config.txt') self.rollback_cfg = optional_args.get('rollback_cfg', 'rollback_config.txt') self.inline_transfer = optional_args.get('inline_transfer', False) if self.transport == 'telnet': # Telnet only supports inline_transfer self.inline_transfer = True # None will cause autodetection of dest_file_system self._dest_file_system = optional_args.get('dest_file_system', None) self.auto_rollback_on_error = optional_args.get('auto_rollback_on_error', True) # Netmiko possible arguments netmiko_argument_map = { 'port': None, 'secret': '', 'verbose': False, 'keepalive': 30, 'global_delay_factor': 1, 'use_keys': False, 'key_file': None, 'ssh_strict': False, 'system_host_keys': False, 'alt_host_keys': False, 'alt_key_file': '', 'ssh_config_file': None, 'allow_agent': False, } # Build dict of any optional Netmiko args self.netmiko_optional_args = {} for k, v in netmiko_argument_map.items(): try: self.netmiko_optional_args[k] = optional_args[k] except KeyError: pass default_port = { 'ssh': 22, 'telnet': 23 } self.port = optional_args.get('port', default_port[self.transport]) self.device = None self.config_replace = False self.interface_map = {} self.profile = ["ios"] def open(self): """Open a connection to the device.""" device_type = 'cisco_ios' if self.transport == 'telnet': device_type = 'cisco_ios_telnet' self.device = ConnectHandler(device_type=device_type, host=self.hostname, username=self.username, password=self.password, **self.netmiko_optional_args) # ensure in enable mode self.device.enable() def _discover_file_system(self): try: return self.device._autodetect_fs() except Exception: msg = "Netmiko _autodetect_fs failed (to workaround specify " \ "dest_file_system in optional_args.)" raise CommandErrorException(msg) def close(self): """Close the connection to the device.""" self.device.disconnect() def _send_command(self, command): """Wrapper for self.device.send.command(). If command is a list will iterate through commands until valid command. """ try: if isinstance(command, list): for cmd in command: output = self.device.send_command(cmd) if "% Invalid" not in output: break else: output = self.device.send_command(command) return self._send_command_postprocess(output) except (socket.error, EOFError) as e: raise ConnectionClosedException(str(e)) def is_alive(self): """Returns a flag with the state of the connection.""" null = chr(0) if self.device is None: return {'is_alive': False} if self.transport == 'telnet': try: # Try sending IAC + NOP (IAC is telnet way of sending command # IAC = Interpret as Command (it comes before the NOP) self.device.write_channel(telnetlib.IAC + telnetlib.NOP) return {'is_alive': True} except UnicodeDecodeError: # Netmiko logging bug (remove after Netmiko >= 1.4.3) return {'is_alive': True} except AttributeError: return {'is_alive': False} else: # SSH try: # Try sending ASCII null byte to maintain the connection alive self.device.write_channel(null) return {'is_alive': self.device.remote_conn.transport.is_active()} except (socket.error, EOFError): # If unable to send, we can tell for sure that the connection is unusable return {'is_alive': False} return {'is_alive': False} @staticmethod def _create_tmp_file(config): """Write temp file and for use with inline config and SCP.""" tmp_dir = tempfile.gettempdir() rand_fname = py23_compat.text_type(uuid.uuid4()) filename = os.path.join(tmp_dir, rand_fname) with open(filename, 'wt') as fobj: fobj.write(config) return filename def _load_candidate_wrapper(self, source_file=None, source_config=None, dest_file=None, file_system=None): """ Transfer file to remote device for either merge or replace operations Returns (return_status, msg) """ return_status = False msg = '' if source_file and source_config: raise ValueError("Cannot simultaneously set source_file and source_config") if source_config: if self.inline_transfer: (return_status, msg) = self._inline_tcl_xfer(source_config=source_config, dest_file=dest_file, file_system=file_system) else: # Use SCP tmp_file = self._create_tmp_file(source_config) (return_status, msg) = self._scp_file(source_file=tmp_file, dest_file=dest_file, file_system=file_system) if tmp_file and os.path.isfile(tmp_file): os.remove(tmp_file) if source_file: if self.inline_transfer: (return_status, msg) = self._inline_tcl_xfer(source_file=source_file, dest_file=dest_file, file_system=file_system) else: (return_status, msg) = self._scp_file(source_file=source_file, dest_file=dest_file, file_system=file_system) if not return_status: if msg == '': msg = "Transfer to remote device failed" return (return_status, msg) def load_replace_candidate(self, filename=None, config=None): """ SCP file to device filesystem, defaults to candidate_config. Return None or raise exception """ self.config_replace = True return_status, msg = self._load_candidate_wrapper(source_file=filename, source_config=config, dest_file=self.candidate_cfg, file_system=self.dest_file_system) if not return_status: raise ReplaceConfigException(msg) def load_merge_candidate(self, filename=None, config=None): """ SCP file to remote device. Merge configuration in: copy running-config """ self.config_replace = False return_status, msg = self._load_candidate_wrapper(source_file=filename, source_config=config, dest_file=self.merge_cfg, file_system=self.dest_file_system) if not return_status: raise MergeConfigException(msg) @staticmethod def _normalize_compare_config(diff): """Filter out strings that should not show up in the diff.""" ignore_strings = ['Contextual Config Diffs', 'No changes were found', 'file prompt quiet', 'ntp clock-period'] new_list = [] for line in diff.splitlines(): for ignore in ignore_strings: if ignore in line: break else: # nobreak new_list.append(line) return "\n".join(new_list) @staticmethod def _normalize_merge_diff_incr(diff): """Make the compare config output look better. Cisco IOS incremental-diff output No changes: !List of Commands: end !No changes were found """ new_diff = [] changes_found = False for line in diff.splitlines(): if re.search(r'order-dependent line.*re-ordered', line): changes_found = True elif 'No changes were found' in line: # IOS in the re-order case still claims "No changes were found" if not changes_found: return '' else: continue if line.strip() == 'end': continue elif 'List of Commands' in line: continue # Filter blank lines and prepend +sign elif line.strip(): if re.search(r"^no\s+", line.strip()): new_diff.append('-' + line) else: new_diff.append('+' + line) return "\n".join(new_diff) @staticmethod def _normalize_merge_diff(diff): """Make compare_config() for merge look similar to replace config diff.""" new_diff = [] for line in diff.splitlines(): # Filter blank lines and prepend +sign if line.strip(): new_diff.append('+' + line) if new_diff: new_diff.insert(0, '! incremental-diff failed; falling back to echo of merge file') else: new_diff.append('! No changes specified in merge file.') return "\n".join(new_diff) def compare_config(self): """ show archive config differences . Default operation is to compare system:running-config to self.candidate_cfg """ # Set defaults base_file = 'running-config' base_file_system = 'system:' if self.config_replace: new_file = self.candidate_cfg else: new_file = self.merge_cfg new_file_system = self.dest_file_system base_file_full = self._gen_full_path(filename=base_file, file_system=base_file_system) new_file_full = self._gen_full_path(filename=new_file, file_system=new_file_system) if self.config_replace: cmd = 'show archive config differences {} {}'.format(base_file_full, new_file_full) diff = self.device.send_command_expect(cmd) diff = self._normalize_compare_config(diff) else: # merge cmd = 'show archive config incremental-diffs {} ignorecase'.format(new_file_full) diff = self.device.send_command_expect(cmd) if 'error code 5' in diff or 'returned error 5' in diff: diff = "You have encountered the obscure 'error 5' message. This generally " \ "means you need to add an 'end' statement to the end of your merge changes." elif '% Invalid' not in diff: diff = self._normalize_merge_diff_incr(diff) else: cmd = 'more {}'.format(new_file_full) diff = self.device.send_command_expect(cmd) diff = self._normalize_merge_diff(diff) return diff.strip() def _commit_hostname_handler(self, cmd): """Special handler for hostname change on commit operation.""" current_prompt = self.device.find_prompt().strip() terminating_char = current_prompt[-1] pattern = r"[>#{}]\s*$".format(terminating_char) # Look exclusively for trailing pattern that includes '#' and '>' output = self.device.send_command_expect(cmd, expect_string=pattern) # Reset base prompt in case hostname changed self.device.set_base_prompt() return output def commit_config(self): """ If replacement operation, perform 'configure replace' for the entire config. If merge operation, perform copy running-config. """ # Always generate a rollback config on commit self._gen_rollback_cfg() if self.config_replace: # Replace operation filename = self.candidate_cfg cfg_file = self._gen_full_path(filename) if not self._check_file_exists(cfg_file): raise ReplaceConfigException("Candidate config file does not exist") if self.auto_rollback_on_error: cmd = 'configure replace {} force revert trigger error'.format(cfg_file) else: cmd = 'configure replace {} force'.format(cfg_file) output = self._commit_hostname_handler(cmd) if ('original configuration has been successfully restored' in output) or \ ('error' in output.lower()) or \ ('failed' in output.lower()): msg = "Candidate config could not be applied\n{}".format(output) raise ReplaceConfigException(msg) elif '%Please turn config archive on' in output: msg = "napalm-ios replace() requires Cisco 'archive' feature to be enabled." raise ReplaceConfigException(msg) else: # Merge operation filename = self.merge_cfg cfg_file = self._gen_full_path(filename) if not self._check_file_exists(cfg_file): raise MergeConfigException("Merge source config file does not exist") cmd = 'copy {} running-config'.format(cfg_file) self._disable_confirm() output = self._commit_hostname_handler(cmd) self._enable_confirm() if 'Invalid input detected' in output: self.rollback() err_header = "Configuration merge failed; automatic rollback attempted" merge_error = "{0}:\n{1}".format(err_header, output) raise MergeConfigException(merge_error) # Save config to startup (both replace and merge) output += self.device.send_command_expect("write mem") def discard_config(self): """Set candidate_cfg to current running-config. Erase the merge_cfg file.""" discard_candidate = 'copy running-config {}'.format(self._gen_full_path(self.candidate_cfg)) discard_merge = 'copy null: {}'.format(self._gen_full_path(self.merge_cfg)) self._disable_confirm() self.device.send_command_expect(discard_candidate) self.device.send_command_expect(discard_merge) self._enable_confirm() def rollback(self): """Rollback configuration to filename or to self.rollback_cfg file.""" filename = self.rollback_cfg cfg_file = self._gen_full_path(filename) if not self._check_file_exists(cfg_file): raise ReplaceConfigException("Rollback config file does not exist") cmd = 'configure replace {} force'.format(cfg_file) self.device.send_command_expect(cmd) # Save config to startup self.device.send_command_expect("write mem") def _inline_tcl_xfer(self, source_file=None, source_config=None, dest_file=None, file_system=None): """ Use Netmiko InlineFileTransfer (TCL) to transfer file or config to remote device. Return (status, msg) status = boolean msg = details on what happened """ if source_file: return self._xfer_file(source_file=source_file, dest_file=dest_file, file_system=file_system, TransferClass=InLineTransfer) if source_config: return self._xfer_file(source_config=source_config, dest_file=dest_file, file_system=file_system, TransferClass=InLineTransfer) raise ValueError("File source not specified for transfer.") def _scp_file(self, source_file, dest_file, file_system): """ SCP file to remote device. Return (status, msg) status = boolean msg = details on what happened """ return self._xfer_file(source_file=source_file, dest_file=dest_file, file_system=file_system, TransferClass=FileTransfer) def _xfer_file(self, source_file=None, source_config=None, dest_file=None, file_system=None, TransferClass=FileTransfer): """Transfer file to remote device. By default, this will use Secure Copy if self.inline_transfer is set, then will use Netmiko InlineTransfer method to transfer inline using either SSH or telnet (plus TCL onbox). Return (status, msg) status = boolean msg = details on what happened """ if not source_file and not source_config: raise ValueError("File source not specified for transfer.") if not dest_file or not file_system: raise ValueError("Destination file or file system not specified.") if source_file: kwargs = dict(ssh_conn=self.device, source_file=source_file, dest_file=dest_file, direction='put', file_system=file_system) elif source_config: kwargs = dict(ssh_conn=self.device, source_config=source_config, dest_file=dest_file, direction='put', file_system=file_system) enable_scp = True if self.inline_transfer: enable_scp = False with TransferClass(**kwargs) as transfer: # Check if file already exists and has correct MD5 if transfer.check_file_exists() and transfer.compare_md5(): msg = "File already exists and has correct MD5: no SCP needed" return (True, msg) if not transfer.verify_space_available(): msg = "Insufficient space available on remote device" return (False, msg) if enable_scp: transfer.enable_scp() # Transfer file transfer.transfer_file() # Compares MD5 between local-remote files if transfer.verify_file(): msg = "File successfully transferred to remote device" return (True, msg) else: msg = "File transfer to remote device failed" return (False, msg) return (False, '') def _enable_confirm(self): """Enable IOS confirmations on file operations (global config command).""" cmd = 'no file prompt quiet' self.device.send_config_set([cmd]) def _disable_confirm(self): """Disable IOS confirmations on file operations (global config command).""" cmd = 'file prompt quiet' self.device.send_config_set([cmd]) def _gen_full_path(self, filename, file_system=None): """Generate full file path on remote device.""" if file_system is None: return '{}/{}'.format(self.dest_file_system, filename) else: if ":" not in file_system: raise ValueError("Invalid file_system specified: {}".format(file_system)) return '{}/{}'.format(file_system, filename) def _gen_rollback_cfg(self): """Save a configuration that can be used for rollback.""" cfg_file = self._gen_full_path(self.rollback_cfg) cmd = 'copy running-config {}'.format(cfg_file) self._disable_confirm() self.device.send_command_expect(cmd) self._enable_confirm() def _check_file_exists(self, cfg_file): """ Check that the file exists on remote device using full path. cfg_file is full path i.e. flash:/file_name For example # dir flash:/candidate_config.txt Directory of flash:/candidate_config.txt 33 -rw- 5592 Dec 18 2015 10:50:22 -08:00 candidate_config.txt return boolean """ cmd = 'dir {}'.format(cfg_file) success_pattern = 'Directory of {}'.format(cfg_file) output = self.device.send_command_expect(cmd) if 'Error opening' in output: return False elif success_pattern in output: return True return False def _expand_interface_name(self, interface_brief): """ Obtain the full interface name from the abbreviated name. Cache mappings in self.interface_map. """ if self.interface_map.get(interface_brief): return self.interface_map.get(interface_brief) command = 'show int {}'.format(interface_brief) output = self._send_command(command) first_line = output.splitlines()[0] if 'line protocol' in first_line: full_int_name = first_line.split()[0] self.interface_map[interface_brief] = full_int_name return self.interface_map.get(interface_brief) else: return interface_brief @staticmethod def _send_command_postprocess(output): """ Cleanup actions on send_command() for NAPALM getters. Remove "Load for five sec; one minute if in output" Remove "Time source is" """ output = re.sub(r"^Load for five secs.*$", "", output, flags=re.M) output = re.sub(r"^Time source is .*$", "", output, flags=re.M) return output.strip() def get_optics(self): command = 'show interfaces transceiver' output = self._send_command(command) # Check if router supports the command if '% Invalid input' in output: return {} # Formatting data into return data structure optics_detail = {} try: split_output = re.split(r'^---------.*$', output, flags=re.M)[1] except IndexError: return {} split_output = split_output.strip() for optics_entry in split_output.splitlines(): # Example, Te1/0/1 34.6 3.29 -2.0 -3.5 try: split_list = optics_entry.split() except ValueError: return {} int_brief = split_list[0] output_power = split_list[3] input_power = split_list[4] port = self._expand_interface_name(int_brief) port_detail = {} port_detail['physical_channels'] = {} port_detail['physical_channels']['channel'] = [] # If interface is shutdown it returns "N/A" as output power. # Converting that to -100.0 float try: float(output_power) except ValueError: output_power = -100.0 # Defaulting avg, min, max values to -100.0 since device does not # return these values optic_states = { 'index': 0, 'state': { 'input_power': { 'instant': (float(input_power) if 'input_power' else -100.0), 'avg': -100.0, 'min': -100.0, 'max': -100.0 }, 'output_power': { 'instant': (float(output_power) if 'output_power' else -100.0), 'avg': -100.0, 'min': -100.0, 'max': -100.0 }, 'laser_bias_current': { 'instant': 0.0, 'avg': 0.0, 'min': 0.0, 'max': 0.0 } } } port_detail['physical_channels']['channel'].append(optic_states) optics_detail[port] = port_detail return optics_detail def get_lldp_neighbors(self): """IOS implementation of get_lldp_neighbors.""" lldp = {} command = 'show lldp neighbors' output = self._send_command(command) # Check if router supports the command if '% Invalid input' in output: return {} # Process the output to obtain just the LLDP entries try: split_output = re.split(r'^Device ID.*$', output, flags=re.M)[1] split_output = re.split(r'^Total entries displayed.*$', split_output, flags=re.M)[0] except IndexError: return {} split_output = split_output.strip() for lldp_entry in split_output.splitlines(): # Example, twb-sf-hpsw1 Fa4 120 B 17 try: device_id, local_int_brief, hold_time, capability, remote_port = lldp_entry.split() except ValueError: if len(lldp_entry.split()) == 4: # Four fields might be long_name or missing capability capability_missing = True if lldp_entry[46] == ' ' else False if capability_missing: device_id, local_int_brief, hold_time, remote_port = lldp_entry.split() else: # Might be long_name issue tmp_field, hold_time, capability, remote_port = lldp_entry.split() device_id = tmp_field[:20] local_int_brief = tmp_field[20:] # device_id might be abbreviated, try to get full name lldp_tmp = self._lldp_detail_parser(local_int_brief) device_id_new = lldp_tmp[3][0] # Verify abbreviated and full name are consistent if device_id_new[:20] == device_id: device_id = device_id_new else: raise ValueError("Unable to obtain remote device name") local_port = self._expand_interface_name(local_int_brief) entry = {'port': remote_port, 'hostname': device_id} lldp.setdefault(local_port, []) lldp[local_port].append(entry) return lldp def _lldp_detail_parser(self, interface): command = "show lldp neighbors {} detail".format(interface) output = self._send_command(command) # Check if router supports the command if '% Invalid input' in output: raise ValueError("Command not supported by network device") # Cisco generally use : for string divider, but sometimes has ' - ' port_id = re.findall(r"Port id\s*?[:-]\s+(.+)", output) port_description = re.findall(r"Port Description\s*?[:-]\s+(.+)", output) chassis_id = re.findall(r"Chassis id\s*?[:-]\s+(.+)", output) system_name = re.findall(r"System Name\s*?[:-]\s+(.+)", output) system_description = re.findall(r"System Description\s*?[:-]\s*(not advertised|\s*\n.+)", output) system_description = [x.strip() for x in system_description] system_capabilities = re.findall(r"System Capabilities\s*?[:-]\s+(.+)", output) enabled_capabilities = re.findall(r"Enabled Capabilities\s*?[:-]\s+(.+)", output) remote_address = re.findall(r"Management Addresses\s*[:-]\s*(not advertised|\n.+)", output) # remote address had two possible patterns which required some secondary processing new_remote_address = [] for val in remote_address: val = val.strip() pattern = r'(?:IP|Other)(?::\s+?)(.+)' match = re.search(pattern, val) if match: new_remote_address.append(match.group(1)) else: new_remote_address.append(val) remote_address = new_remote_address return [port_id, port_description, chassis_id, system_name, system_description, system_capabilities, enabled_capabilities, remote_address] def get_lldp_neighbors_detail(self, interface=''): """ IOS implementation of get_lldp_neighbors_detail. Calls get_lldp_neighbors. """ lldp = {} lldp_neighbors = self.get_lldp_neighbors() # Filter to specific interface if interface: lldp_data = lldp_neighbors.get(interface) if lldp_data: lldp_neighbors = {interface: lldp_data} else: lldp_neighbors = {} for interface in lldp_neighbors: local_port = interface lldp_fields = self._lldp_detail_parser(interface) # Convert any 'not advertised' to 'N/A' for field in lldp_fields: for i, value in enumerate(field): if 'not advertised' in value: field[i] = 'N/A' number_entries = len(lldp_fields[0]) # re.findall will return a list. Make sure same number of entries always returned. for test_list in lldp_fields: if len(test_list) != number_entries: raise ValueError("Failure processing show lldp neighbors detail") # Standardize the fields port_id, port_description, chassis_id, system_name, system_description, \ system_capabilities, enabled_capabilities, remote_address = lldp_fields standardized_fields = zip(port_id, port_description, chassis_id, system_name, system_description, system_capabilities, enabled_capabilities, remote_address) lldp.setdefault(local_port, []) for entry in standardized_fields: remote_port_id, remote_port_description, remote_chassis_id, remote_system_name, \ remote_system_description, remote_system_capab, remote_enabled_capab, \ remote_mgmt_address = entry lldp[local_port].append({ 'parent_interface': u'N/A', 'remote_port': remote_port_id, 'remote_port_description': remote_port_description, 'remote_chassis_id': remote_chassis_id, 'remote_system_name': remote_system_name, 'remote_system_description': remote_system_description, 'remote_system_capab': remote_system_capab, 'remote_system_enable_capab': remote_enabled_capab}) return lldp @staticmethod def parse_uptime(uptime_str): """ Extract the uptime string from the given Cisco IOS Device. Return the uptime in seconds as an integer """ # Initialize to zero (years, weeks, days, hours, minutes) = (0, 0, 0, 0, 0) uptime_str = uptime_str.strip() time_list = uptime_str.split(',') for element in time_list: if re.search("year", element): years = int(element.split()[0]) elif re.search("week", element): weeks = int(element.split()[0]) elif re.search("day", element): days = int(element.split()[0]) elif re.search("hour", element): hours = int(element.split()[0]) elif re.search("minute", element): minutes = int(element.split()[0]) uptime_sec = (years * YEAR_SECONDS) + (weeks * WEEK_SECONDS) + (days * DAY_SECONDS) + \ (hours * 3600) + (minutes * 60) return uptime_sec def get_facts(self): """Return a set of facts from the devices.""" # default values. vendor = u'Cisco' uptime = -1 serial_number, fqdn, os_version, hostname, domain_name = ('Unknown',) * 5 # obtain output from device show_ver = self._send_command('show version') show_hosts = self._send_command('show hosts') show_ip_int_br = self._send_command('show ip interface brief') # uptime/serial_number/IOS version for line in show_ver.splitlines(): if ' uptime is ' in line: hostname, uptime_str = line.split(' uptime is ') uptime = self.parse_uptime(uptime_str) hostname = hostname.strip() if 'Processor board ID' in line: _, serial_number = line.split("Processor board ID ") serial_number = serial_number.strip() if re.search(r"Cisco IOS Software", line): try: _, os_version = line.split("Cisco IOS Software, ") except ValueError: # Handle 'Cisco IOS Software [Denali],' _, os_version = re.split(r"Cisco IOS Software \[.*?\], ", line) os_version = os_version.strip() elif re.search(r"IOS (tm).+Software", line): _, os_version = line.split("IOS (tm) ") os_version = os_version.strip() # Determine domain_name and fqdn for line in show_hosts.splitlines(): if 'Default domain' in line: _, domain_name = line.split("Default domain is ") domain_name = domain_name.strip() break if domain_name != 'Unknown' and hostname != 'Unknown': fqdn = u'{}.{}'.format(hostname, domain_name) # model filter try: match_model = re.search(r"Cisco (.+?) .+bytes of", show_ver, flags=re.IGNORECASE) model = match_model.group(1) except AttributeError: model = u'Unknown' # interface_list filter interface_list = [] show_ip_int_br = show_ip_int_br.strip() for line in show_ip_int_br.splitlines(): if 'Interface ' in line: continue interface = line.split()[0] interface_list.append(interface) return { 'uptime': uptime, 'vendor': vendor, 'os_version': py23_compat.text_type(os_version), 'serial_number': py23_compat.text_type(serial_number), 'model': py23_compat.text_type(model), 'hostname': py23_compat.text_type(hostname), 'fqdn': fqdn, 'interface_list': interface_list } def get_interfaces(self): """ Get interface details. last_flapped is not implemented Example Output: { u'Vlan1': { 'description': u'N/A', 'is_enabled': True, 'is_up': True, 'last_flapped': -1.0, 'mac_address': u'a493.4cc1.67a7', 'speed': 100}, u'Vlan100': { 'description': u'Data Network', 'is_enabled': True, 'is_up': True, 'last_flapped': -1.0, 'mac_address': u'a493.4cc1.67a7', 'speed': 100}, u'Vlan200': { 'description': u'Voice Network', 'is_enabled': True, 'is_up': True, 'last_flapped': -1.0, 'mac_address': u'a493.4cc1.67a7', 'speed': 100}} """ # default values. last_flapped = -1.0 command = 'show interfaces' output = self._send_command(command) interface = description = mac_address = speed = speedformat = '' is_enabled = is_up = None interface_dict = {} for line in output.splitlines(): interface_regex_1 = r"^(\S+?)\s+is\s+(.+?),\s+line\s+protocol\s+is\s+(\S+)" interface_regex_2 = r"^(\S+)\s+is\s+(up|down)" for pattern in (interface_regex_1, interface_regex_2): interface_match = re.search(pattern, line) if interface_match: interface = interface_match.group(1) status = interface_match.group(2) try: protocol = interface_match.group(3) except IndexError: protocol = '' if 'admin' in status.lower(): is_enabled = False else: is_enabled = True if protocol: is_up = bool('up' in protocol) else: is_up = bool('up' in status) break mac_addr_regex = r"^\s+Hardware.+address\s+is\s+({})".format(MAC_REGEX) if re.search(mac_addr_regex, line): mac_addr_match = re.search(mac_addr_regex, line) mac_address = napalm_base.helpers.mac(mac_addr_match.groups()[0]) descr_regex = "^\s+Description:\s+(.+?)$" if re.search(descr_regex, line): descr_match = re.search(descr_regex, line) description = descr_match.groups()[0] speed_regex = r"^\s+MTU\s+\d+.+BW\s+(\d+)\s+([KMG]?b)" if re.search(speed_regex, line): speed_match = re.search(speed_regex, line) speed = speed_match.groups()[0] speedformat = speed_match.groups()[1] speed = float(speed) if speedformat.startswith('Kb'): speed = speed / 1000.0 elif speedformat.startswith('Gb'): speed = speed * 1000 speed = int(round(speed)) if interface == '': raise ValueError("Interface attributes were \ found without any known interface") if not isinstance(is_up, bool) or not isinstance(is_enabled, bool): raise ValueError("Did not correctly find the interface status") interface_dict[interface] = {'is_enabled': is_enabled, 'is_up': is_up, 'description': description, 'mac_address': mac_address, 'last_flapped': last_flapped, 'speed': speed} interface = description = mac_address = speed = speedformat = '' is_enabled = is_up = None return interface_dict def get_interfaces_ip(self): """ Get interface ip details. Returns a dict of dicts Example Output: { u'FastEthernet8': { 'ipv4': { u'10.66.43.169': { 'prefix_length': 22}}}, u'Loopback555': { 'ipv4': { u'192.168.1.1': { 'prefix_length': 24}}, 'ipv6': { u'1::1': { 'prefix_length': 64}, u'2001:DB8:1::1': { 'prefix_length': 64}, u'2::': { 'prefix_length': 64}, u'FE80::3': { 'prefix_length': 10}}}, u'Tunnel0': { 'ipv4': { u'10.63.100.9': { 'prefix_length': 24}}}, u'Tunnel1': { 'ipv4': { u'10.63.101.9': { 'prefix_length': 24}}}, u'Vlan100': { 'ipv4': { u'10.40.0.1': { 'prefix_length': 24}, u'10.41.0.1': { 'prefix_length': 24}, u'10.65.0.1': { 'prefix_length': 24}}}, u'Vlan200': { 'ipv4': { u'10.63.176.57': { 'prefix_length': 29}}}} """ interfaces = {} command = 'show ip interface' show_ip_interface = self._send_command(command) command = 'show ipv6 interface' show_ipv6_interface = self._send_command(command) INTERNET_ADDRESS = r'\s+(?:Internet address is|Secondary address)' INTERNET_ADDRESS += r' (?P{})/(?P\d+)'.format(IPV4_ADDR_REGEX) LINK_LOCAL_ADDRESS = r'\s+IPv6 is enabled, link-local address is (?P[a-fA-F0-9:]+)' GLOBAL_ADDRESS = r'\s+(?P[a-fA-F0-9:]+), subnet is (?:[a-fA-F0-9:]+)/(?P\d+)' interfaces = {} for line in show_ip_interface.splitlines(): if(len(line.strip()) == 0): continue if(line[0] != ' '): ipv4 = {} interface_name = line.split()[0] m = re.match(INTERNET_ADDRESS, line) if m: ip, prefix = m.groups() ipv4.update({ip: {"prefix_length": int(prefix)}}) interfaces[interface_name] = {'ipv4': ipv4} for line in show_ipv6_interface.splitlines(): if(len(line.strip()) == 0): continue if(line[0] != ' '): ifname = line.split()[0] ipv6 = {} if ifname not in interfaces: interfaces[ifname] = {'ipv6': ipv6} else: interfaces[ifname].update({'ipv6': ipv6}) m = re.match(LINK_LOCAL_ADDRESS, line) if m: ip = m.group(1) ipv6.update({ip: {"prefix_length": 10}}) m = re.match(GLOBAL_ADDRESS, line) if m: ip, prefix = m.groups() ipv6.update({ip: {"prefix_length": int(prefix)}}) # Interface without ipv6 doesn't appears in show ipv6 interface return interfaces @staticmethod def bgp_time_conversion(bgp_uptime): """ Convert string time to seconds. Examples 00:14:23 00:13:40 00:00:21 00:00:13 00:00:49 1d11h 1d17h 1w0d 8w5d 1y28w never """ bgp_uptime = bgp_uptime.strip() uptime_letters = set(['w', 'h', 'd']) if 'never' in bgp_uptime: return -1 elif ':' in bgp_uptime: times = bgp_uptime.split(":") times = [int(x) for x in times] hours, minutes, seconds = times return (hours * 3600) + (minutes * 60) + seconds # Check if any letters 'w', 'h', 'd' are in the time string elif uptime_letters & set(bgp_uptime): form1 = r'(\d+)d(\d+)h' # 1d17h form2 = r'(\d+)w(\d+)d' # 8w5d form3 = r'(\d+)y(\d+)w' # 1y28w match = re.search(form1, bgp_uptime) if match: days = int(match.group(1)) hours = int(match.group(2)) return (days * DAY_SECONDS) + (hours * 3600) match = re.search(form2, bgp_uptime) if match: weeks = int(match.group(1)) days = int(match.group(2)) return (weeks * WEEK_SECONDS) + (days * DAY_SECONDS) match = re.search(form3, bgp_uptime) if match: years = int(match.group(1)) weeks = int(match.group(2)) return (years * YEAR_SECONDS) + (weeks * WEEK_SECONDS) raise ValueError("Unexpected value for BGP uptime string: {}".format(bgp_uptime)) def get_bgp_neighbors(self): """BGP neighbor information. Currently no VRF support. Supports both IPv4 and IPv6. """ supported_afi = ['ipv4', 'ipv6'] bgp_neighbor_data = dict() bgp_neighbor_data['global'] = {} # get summary output from device cmd_bgp_all_sum = 'show bgp all summary' summary_output = self._send_command(cmd_bgp_all_sum).strip() # get neighbor output from device neighbor_output = '' for afi in supported_afi: cmd_bgp_neighbor = 'show bgp %s unicast neighbors' % afi neighbor_output += self._send_command(cmd_bgp_neighbor).strip() # trailing newline required for parsing neighbor_output += "\n" # Regular expressions used for parsing BGP summary parse_summary = { 'patterns': [ # For address family: IPv4 Unicast {'regexp': re.compile(r'^For address family: (?P\S+) '), 'record': False}, # Capture router_id and local_as values, e.g.: # BGP router identifier 10.0.1.1, local AS number 65000 {'regexp': re.compile(r'^.* router identifier (?P{}), ' r'local AS number (?P{})'.format( IPV4_ADDR_REGEX, ASN_REGEX )), 'record': False}, # Match neighbor summary row, capturing useful details and # discarding the 5 columns that we don't care about, e.g.: # Neighbor V AS MsgRcvd MsgSent TblVer InQ OutQ Up/Down State/PfxRcd # 10.0.0.2 4 65000 1336020 64337701 1011343614 0 0 8w0d 3143 {'regexp': re.compile(r'^\*?(?P({})|({}))' r'\s+\d+\s+(?P{})(\s+\S+){{5}}\s+' r'(?P(never)|\d+\S+)' r'\s+(?P\d+)'.format( IPV4_ADDR_REGEX, IPV6_ADDR_REGEX, ASN_REGEX )), 'record': True}, # Same as above, but for peer that are not Established, e.g.: # Neighbor V AS MsgRcvd MsgSent TblVer InQ OutQ Up/Down State/PfxRcd # 192.168.0.2 4 65002 0 0 1 0 0 never Active {'regexp': re.compile(r'^\*?(?P({})|({}))' r'\s+\d+\s+(?P{})(\s+\S+){{5}}\s+' r'(?P(never)|\d+\S+)\s+(?P\D.*)'.format( IPV4_ADDR_REGEX, IPV6_ADDR_REGEX, ASN_REGEX )), 'record': True}, # ipv6 peers often break accross rows because of the longer peer address, # match as above, but in separate expressions, e.g.: # Neighbor V AS MsgRcvd MsgSent TblVer InQ OutQ Up/Down State/PfxRcd # 2001:DB8::4 # 4 65004 9900690 612449 155362939 0 0 26w6d 36391 {'regexp': re.compile(r'^\*?(?P({})|({}))'.format( IPV4_ADDR_REGEX, IPV6_ADDR_REGEX )), 'record': False}, {'regexp': re.compile(r'^\s+\d+\s+(?P{})(\s+\S+){{5}}\s+' r'(?P(never)|\d+\S+)' r'\s+(?P\d+)'.format( ASN_REGEX )), 'record': True}, # Same as above, but for peers that are not Established, e.g.: # Neighbor V AS MsgRcvd MsgSent TblVer InQ OutQ Up/Down State/PfxRcd # 2001:DB8::3 # 4 65003 0 0 1 0 0 never Idle (Admin) {'regexp': re.compile(r'^\s+\d+\s+(?P{})(\s+\S+){{5}}\s+' r'(?P(never)|\d+\S+)\s+(?P\D.*)'.format( ASN_REGEX )), 'record': True} ], 'no_fill_fields': ['accepted_prefixes', 'state', 'uptime', 'remote_as', 'remote_addr'] } parse_neighbors = { 'patterns': [ # Capture BGP neighbor is 10.0.0.2, remote AS 65000, internal link {'regexp': re.compile(r'^BGP neighbor is (?P({})|({})),' r'\s+remote AS (?P{}).*'.format( IPV4_ADDR_REGEX, IPV6_ADDR_REGEX, ASN_REGEX )), 'record': False}, # Capture description {'regexp': re.compile(r'^\s+Description: (?P.+)'), 'record': False}, # Capture remote_id, e.g.: # BGP version 4, remote router ID 10.0.1.2 {'regexp': re.compile(r'^\s+BGP version \d+, remote router ID ' r'(?P{})'.format(IPV4_ADDR_REGEX)), 'record': False}, # Capture AFI and SAFI names, e.g.: # For address family: IPv4 Unicast {'regexp': re.compile(r'^\s+For address family: (?P\S+) '), 'record': False}, # Capture current sent and accepted prefixes, e.g.: # Prefixes Current: 637213 3142 (Consumes 377040 bytes) {'regexp': re.compile(r'^\s+Prefixes Current:\s+(?P\d+)\s+' r'(?P\d+).*'), 'record': False}, # Capture received_prefixes if soft-reconfig is enabled for the peer {'regexp': re.compile(r'^\s+Saved (soft-reconfig):.+(?P\d+).*'), 'record': True}, # Otherwise, use the following as an end of row marker {'regexp': re.compile(r'^\s+Local Policy Denied Prefixes:.+'), 'record': True} ], # fields that should not be "filled down" across table rows 'no_fill_fields': ['received_prefixes', 'accepted_prefixes', 'sent_prefixes'] } # Parse outputs into a list of dicts summary_data = [] summary_data_entry = {} for line in summary_output.splitlines(): # check for matches against each pattern for item in parse_summary['patterns']: match = item['regexp'].match(line) if match: # a match was found, so update the temp entry with the match's groupdict summary_data_entry.update(match.groupdict()) if item['record']: # Record indicates the last piece of data has been obtained; move # on to next entry summary_data.append(copy.deepcopy(summary_data_entry)) # remove keys that are listed in no_fill_fields before the next pass for field in parse_summary['no_fill_fields']: try: del summary_data_entry[field] except KeyError: pass break neighbor_data = [] neighbor_data_entry = {} for line in neighbor_output.splitlines(): # check for matches against each pattern for item in parse_neighbors['patterns']: match = item['regexp'].match(line) if match: # a match was found, so update the temp entry with the match's groupdict neighbor_data_entry.update(match.groupdict()) if item['record']: # Record indicates the last piece of data has been obtained; move # on to next entry neighbor_data.append(copy.deepcopy(neighbor_data_entry)) # remove keys that are listed in no_fill_fields before the next pass for field in parse_neighbors['no_fill_fields']: try: del neighbor_data_entry[field] except KeyError: pass break router_id = None for entry in summary_data: if not router_id: router_id = entry['router_id'] elif entry['router_id'] != router_id: raise ValueError # check the router_id looks like an ipv4 address router_id = napalm_base.helpers.ip(router_id, version=4) # add parsed data to output dict bgp_neighbor_data['global']['router_id'] = router_id bgp_neighbor_data['global']['peers'] = {} for entry in summary_data: remote_addr = napalm_base.helpers.ip(entry['remote_addr']) afi = entry['afi'].lower() # check that we're looking at a supported afi if afi not in supported_afi: continue # get neighbor_entry out of neighbor data neighbor_entry = None for neighbor in neighbor_data: if (neighbor['afi'].lower() == afi and napalm_base.helpers.ip(neighbor['remote_addr']) == remote_addr): neighbor_entry = neighbor break if not isinstance(neighbor_entry, dict): raise ValueError(msg="Couldn't find neighbor data for %s in afi %s" % (remote_addr, afi)) # check for admin down state try: if "(Admin)" in entry['state']: is_enabled = False else: is_enabled = True except KeyError: is_enabled = True # parse uptime value uptime = self.bgp_time_conversion(entry['uptime']) # Uptime should be -1 if BGP session not up is_up = True if uptime >= 0 else False # check whether session is up for address family and get prefix count try: accepted_prefixes = int(entry['accepted_prefixes']) except (ValueError, KeyError): accepted_prefixes = -1 # Only parse neighbor detailed data if BGP session is-up if is_up: try: # overide accepted_prefixes with neighbor data if possible (since that's newer) accepted_prefixes = int(neighbor_entry['accepted_prefixes']) except (ValueError, KeyError): pass # try to get received prefix count, otherwise set to accepted_prefixes received_prefixes = neighbor_entry.get('received_prefixes', accepted_prefixes) # try to get sent prefix count and convert to int, otherwise set to -1 sent_prefixes = int(neighbor_entry.get('sent_prefixes', -1)) else: received_prefixes = -1 sent_prefixes = -1 # get description try: description = py23_compat.text_type(neighbor_entry['description']) except KeyError: description = '' # check the remote router_id looks like an ipv4 address remote_id = napalm_base.helpers.ip(neighbor_entry['remote_id'], version=4) if remote_addr not in bgp_neighbor_data['global']['peers']: bgp_neighbor_data['global']['peers'][remote_addr] = { 'local_as': napalm_base.helpers.as_number(entry['local_as']), 'remote_as': napalm_base.helpers.as_number(entry['remote_as']), 'remote_id': remote_id, 'is_up': is_up, 'is_enabled': is_enabled, 'description': description, 'uptime': uptime, 'address_family': { afi: { 'received_prefixes': received_prefixes, 'accepted_prefixes': accepted_prefixes, 'sent_prefixes': sent_prefixes } } } else: # found previous data for matching remote_addr, but for different afi existing = bgp_neighbor_data['global']['peers'][remote_addr] assert afi not in existing['address_family'] # compare with existing values and croak if they don't match assert existing['local_as'] == napalm_base.helpers.as_number(entry['local_as']) assert existing['remote_as'] == napalm_base.helpers.as_number(entry['remote_as']) assert existing['remote_id'] == remote_id assert existing['is_enabled'] == is_enabled assert existing['description'] == description # merge other values in a sane manner existing['is_up'] = existing['is_up'] or is_up existing['uptime'] = max(existing['uptime'], uptime) existing['address_family'][afi] = { 'received_prefixes': received_prefixes, 'accepted_prefixes': accepted_prefixes, 'sent_prefixes': sent_prefixes } return bgp_neighbor_data def get_interfaces_counters(self): """ Return interface counters and errors. 'tx_errors': int, 'rx_errors': int, 'tx_discards': int, 'rx_discards': int, 'tx_octets': int, 'rx_octets': int, 'tx_unicast_packets': int, 'rx_unicast_packets': int, 'tx_multicast_packets': int, 'rx_multicast_packets': int, 'tx_broadcast_packets': int, 'rx_broadcast_packets': int, Currently doesn't determine output broadcasts, multicasts """ counters = {} command = 'show interfaces' output = self._send_command(command) sh_int_sum_cmd = 'show interface summary' sh_int_sum_cmd_out = self._send_command(sh_int_sum_cmd) # Break output into per-interface sections interface_strings = re.split(r'.* line protocol is .*', output, flags=re.M) header_strings = re.findall(r'.* line protocol is .*', output, flags=re.M) empty = interface_strings.pop(0).strip() if empty: raise ValueError("Unexpected output from: {}".format(command)) # Parse out the interface names intf = [] for intf_line in header_strings: interface, _ = re.split(r" is .* line protocol is ", intf_line) intf.append(interface.strip()) if len(intf) != len(interface_strings): raise ValueError("Unexpected output from: {}".format(command)) # Re-join interface names with interface strings for interface, interface_str in zip(intf, interface_strings): counters.setdefault(interface, {}) for line in interface_str.splitlines(): if 'packets input' in line: # '0 packets input, 0 bytes, 0 no buffer' match = re.search(r"(\d+) packets input.* (\d+) bytes", line) counters[interface]['rx_unicast_packets'] = int(match.group(1)) counters[interface]['rx_octets'] = int(match.group(2)) elif 'broadcast' in line: # 'Received 0 broadcasts (0 multicasts)' # 'Received 264071 broadcasts (39327 IP multicasts)' # 'Received 338 broadcasts, 0 runts, 0 giants, 0 throttles' match = re.search(r"Received (\d+) broadcasts.*(\d+).*multicasts", line) alt_match = re.search(r"Received (\d+) broadcasts.*", line) if match: counters[interface]['rx_broadcast_packets'] = int(match.group(1)) counters[interface]['rx_multicast_packets'] = int(match.group(2)) elif alt_match: counters[interface]['rx_broadcast_packets'] = int(alt_match.group(1)) counters[interface]['rx_multicast_packets'] = -1 else: counters[interface]['rx_broadcast_packets'] = -1 counters[interface]['rx_multicast_packets'] = -1 elif 'packets output' in line: # '0 packets output, 0 bytes, 0 underruns' match = re.search(r"(\d+) packets output.* (\d+) bytes", line) counters[interface]['tx_unicast_packets'] = int(match.group(1)) counters[interface]['tx_octets'] = int(match.group(2)) counters[interface]['tx_broadcast_packets'] = -1 counters[interface]['tx_multicast_packets'] = -1 elif 'input errors' in line: # '0 input errors, 0 CRC, 0 frame, 0 overrun, 0 ignored' match = re.search(r"(\d+) input errors", line) counters[interface]['rx_errors'] = int(match.group(1)) counters[interface]['rx_discards'] = -1 elif 'output errors' in line: # '0 output errors, 0 collisions, 1 interface resets' match = re.search(r"(\d+) output errors", line) counters[interface]['tx_errors'] = int(match.group(1)) counters[interface]['tx_discards'] = -1 for line in sh_int_sum_cmd_out.splitlines(): if interface in line: # Line is tabular output with columns # Interface IHQ IQD OHQ OQD RXBS RXPS TXBS TXPS TRTL # where columns (excluding interface) are integers regex = r"\b" + interface + \ r"\b\s+(\d+)\s+(?P\d+)\s+(\d+)" + \ r"\s+(?P\d+)\s+(\d+)\s+(\d+)" + \ r"\s+(\d+)\s+(\d+)\s+(\d+)" match = re.search(regex, line) if match: counters[interface]['rx_discards'] = int(match.group("IQD")) counters[interface]['tx_discards'] = int(match.group("OQD")) return counters def get_environment(self): """ Get environment facts. power and fan are currently not implemented cpu is using 1-minute average cpu hard-coded to cpu0 (i.e. only a single CPU) """ environment = {} cpu_cmd = 'show proc cpu' mem_cmd = 'show memory statistics' temp_cmd = 'show env temperature status' output = self._send_command(cpu_cmd) environment.setdefault('cpu', {}) environment['cpu'][0] = {} environment['cpu'][0]['%usage'] = 0.0 for line in output.splitlines(): if 'CPU utilization' in line: # CPU utilization for five seconds: 2%/0%; one minute: 2%; five minutes: 1% cpu_regex = r'^.*one minute: (\d+)%; five.*$' match = re.search(cpu_regex, line) environment['cpu'][0]['%usage'] = float(match.group(1)) break output = self._send_command(mem_cmd) for line in output.splitlines(): if 'Processor' in line: _, _, _, proc_used_mem, proc_free_mem = line.split()[:5] elif 'I/O' in line or 'io' in line: _, _, _, io_used_mem, io_free_mem = line.split()[:5] used_mem = int(proc_used_mem) + int(io_used_mem) free_mem = int(proc_free_mem) + int(io_free_mem) environment.setdefault('memory', {}) environment['memory']['used_ram'] = used_mem environment['memory']['available_ram'] = free_mem environment.setdefault('temperature', {}) # The 'show env temperature status' is not ubiquitous in Cisco IOS output = self._send_command(temp_cmd) if '% Invalid' not in output: for line in output.splitlines(): if 'System Temperature Value' in line: system_temp = float(line.split(':')[1].split()[0]) elif 'Yellow Threshold' in line: system_temp_alert = float(line.split(':')[1].split()[0]) elif 'Red Threshold' in line: system_temp_crit = float(line.split(':')[1].split()[0]) env_value = {'is_alert': system_temp >= system_temp_alert, 'is_critical': system_temp >= system_temp_crit, 'temperature': system_temp} environment['temperature']['system'] = env_value else: env_value = {'is_alert': False, 'is_critical': False, 'temperature': -1.0} environment['temperature']['invalid'] = env_value # Initialize 'power' and 'fan' to default values (not implemented) environment.setdefault('power', {}) environment['power']['invalid'] = {'status': True, 'output': -1.0, 'capacity': -1.0} environment.setdefault('fans', {}) environment['fans']['invalid'] = {'status': True} return environment def get_arp_table(self): """ Get arp table information. Return a list of dictionaries having the following set of keys: * interface (string) * mac (string) * ip (string) * age (float) For example:: [ { 'interface' : 'MgmtEth0/RSP0/CPU0/0', 'mac' : '5c:5e:ab:da:3c:f0', 'ip' : '172.17.17.1', 'age' : 1454496274.84 }, { 'interface': 'MgmtEth0/RSP0/CPU0/0', 'mac' : '66:0e:94:96:e0:ff', 'ip' : '172.17.17.2', 'age' : 1435641582.49 } ] """ arp_table = [] command = 'show arp | exclude Incomplete' output = self._send_command(command) # Skip the first line which is a header output = output.split('\n') output = output[1:] for line in output: if len(line) == 0: return {} if len(line.split()) == 5: # Static ARP entries have no interface # Internet 10.0.0.1 - 0010.2345.1cda ARPA interface = '' protocol, address, age, mac, eth_type = line.split() elif len(line.split()) == 6: protocol, address, age, mac, eth_type, interface = line.split() else: raise ValueError("Unexpected output from: {}".format(line.split())) try: if age == '-': age = 0 age = float(age) except ValueError: raise ValueError("Unable to convert age value to float: {}".format(age)) # Validate we matched correctly if not re.search(RE_IPADDR, address): raise ValueError("Invalid IP Address detected: {}".format(address)) if not re.search(RE_MAC, mac): raise ValueError("Invalid MAC Address detected: {}".format(mac)) entry = { 'interface': interface, 'mac': napalm_base.helpers.mac(mac), 'ip': address, 'age': age } arp_table.append(entry) return arp_table def cli(self, commands): """ Execute a list of commands and return the output in a dictionary format using the command as the key. Example input: ['show clock', 'show calendar'] Output example: { 'show calendar': u'22:02:01 UTC Thu Feb 18 2016', 'show clock': u'*22:01:51.165 UTC Thu Feb 18 2016'} """ cli_output = dict() if type(commands) is not list: raise TypeError('Please enter a valid list of commands!') for command in commands: output = self._send_command(command) if 'Invalid input detected' in output: raise ValueError('Unable to execute command "{}"'.format(command)) cli_output.setdefault(command, {}) cli_output[command] = output return cli_output def get_ntp_servers(self): """Implementation of get_ntp_servers for IOS. Returns the NTP servers configuration as dictionary. The keys of the dictionary represent the IP Addresses of the servers. Inner dictionaries do not have yet any available keys. Example:: { '192.168.0.1': {}, '17.72.148.53': {}, '37.187.56.220': {}, '162.158.20.18': {} } """ ntp_servers = {} command = 'show run | include ntp server' output = self._send_command(command) for line in output.splitlines(): split_line = line.split() if "vrf" == split_line[2]: ntp_servers[split_line[4]] = {} else: ntp_servers[split_line[2]] = {} return ntp_servers def get_ntp_stats(self): """Implementation of get_ntp_stats for IOS.""" ntp_stats = [] command = 'show ntp associations' output = self._send_command(command) for line in output.splitlines(): # Skip first two lines and last line of command output if line == "" or 'address' in line or 'sys.peer' in line: continue if '%NTP is not enabled' in line: return [] elif len(line.split()) == 9: address, ref_clock, st, when, poll, reach, delay, offset, disp = line.split() address_regex = re.match('(\W*)([0-9.*]*)', address) try: ntp_stats.append({ 'remote': py23_compat.text_type(address_regex.group(2)), 'synchronized': ('*' in address_regex.group(1)), 'referenceid': py23_compat.text_type(ref_clock), 'stratum': int(st), 'type': u'-', 'when': py23_compat.text_type(when), 'hostpoll': int(poll), 'reachability': int(reach), 'delay': float(delay), 'offset': float(offset), 'jitter': float(disp) }) except Exception: continue return ntp_stats def get_mac_address_table(self): """ Returns a lists of dictionaries. Each dictionary represents an entry in the MAC Address Table, having the following keys * mac (string) * interface (string) * vlan (int) * active (boolean) * static (boolean) * moves (int) * last_move (float) Format1: Destination Address Address Type VLAN Destination Port ------------------- ------------ ---- -------------------- 6400.f1cf.2cc6 Dynamic 1 Wlan-GigabitEthernet0 Cat 6500: Legend: * - primary entry age - seconds since last seen n/a - not available vlan mac address type learn age ports ------+----------------+--------+-----+----------+-------------------------- * 999 1111.2222.3333 dynamic Yes 0 Port-channel1 999 1111.2222.3333 dynamic Yes 0 Port-channel1 Cat 4948 Unicast Entries vlan mac address type protocols port -------+---------------+--------+---------------------+-------------------- 999 1111.2222.3333 dynamic ip Port-channel1 Cat 2960 Mac Address Table ------------------------------------------- Vlan Mac Address Type Ports ---- ----------- -------- ----- All 1111.2222.3333 STATIC CPU """ RE_MACTABLE_DEFAULT = r"^" + MAC_REGEX RE_MACTABLE_6500_1 = r"^\*\s+{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 7 fields RE_MACTABLE_6500_2 = r"^{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 6 fields RE_MACTABLE_6500_3 = r"^\s{51}\S+" # Fill down from prior RE_MACTABLE_4500_1 = r"^{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 5 fields RE_MACTABLE_4500_2 = r"^\s{32}\S+" # Fill down from prior RE_MACTABLE_2960_1 = r"^All\s+{}".format(MAC_REGEX) RE_MACTABLE_GEN_1 = r"^{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 4 fields (2960/4500) def process_mac_fields(vlan, mac, mac_type, interface): """Return proper data for mac address fields.""" if mac_type.lower() in ['self', 'static', 'system']: static = True if vlan.lower() == 'all': vlan = 0 if interface.lower() == 'cpu' or re.search(r'router', interface.lower()) or \ re.search(r'switch', interface.lower()): interface = '' else: static = False if mac_type.lower() in ['dynamic']: active = True else: active = False return { 'mac': napalm_base.helpers.mac(mac), 'interface': interface, 'vlan': int(vlan), 'static': static, 'active': active, 'moves': -1, 'last_move': -1.0 } mac_address_table = [] command = IOS_COMMANDS['show_mac_address'] output = self._send_command(command) # Skip the header lines output = re.split(r'^----.*', output, flags=re.M)[1:] output = "\n".join(output).strip() # Strip any leading astericks output = re.sub(r"^\*", "", output, flags=re.M) fill_down_vlan = fill_down_mac = fill_down_mac_type = '' for line in output.splitlines(): # Cat6500 one off anf 4500 multicast format if (re.search(RE_MACTABLE_6500_3, line) or re.search(RE_MACTABLE_4500_2, line)): interface = line.strip() if ',' in interface: interfaces = interface.split(',') else: interfaces = [] interfaces.append(interface) for single_interface in interfaces: mac_address_table.append(process_mac_fields(fill_down_vlan, fill_down_mac, fill_down_mac_type, single_interface)) continue line = line.strip() if line == '': continue if re.search(r"^---", line): # Convert any '---' to VLAN 0 line = re.sub(r"^---", "0", line, flags=re.M) # Format1 if re.search(RE_MACTABLE_DEFAULT, line): if len(line.split()) == 4: mac, mac_type, vlan, interface = line.split() mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface)) else: raise ValueError("Unexpected output from: {}".format(line.split())) # Cat6500 format elif (re.search(RE_MACTABLE_6500_1, line) or re.search(RE_MACTABLE_6500_2, line)) and \ len(line.split()) >= 6: if len(line.split()) == 7: _, vlan, mac, mac_type, _, _, interface = line.split() elif len(line.split()) == 6: vlan, mac, mac_type, _, _, interface = line.split() if ',' in interface: interfaces = interface.split(',') fill_down_vlan = vlan fill_down_mac = mac fill_down_mac_type = mac_type for single_interface in interfaces: mac_address_table.append(process_mac_fields(vlan, mac, mac_type, single_interface)) else: mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface)) # Cat4500 format elif re.search(RE_MACTABLE_4500_1, line) and len(line.split()) == 5: vlan, mac, mac_type, _, interface = line.split() mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface)) # Cat2960 format - ignore extra header line elif re.search(r"^Vlan\s+Mac Address\s+", line): continue # Cat2960 format (Cat4500 format multicast entries) elif (re.search(RE_MACTABLE_2960_1, line) or re.search(RE_MACTABLE_GEN_1, line)) and \ len(line.split()) == 4: vlan, mac, mac_type, interface = line.split() if ',' in interface: interfaces = interface.split(',') fill_down_vlan = vlan fill_down_mac = mac fill_down_mac_type = mac_type for single_interface in interfaces: mac_address_table.append(process_mac_fields(vlan, mac, mac_type, single_interface)) else: mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface)) elif re.search(r"Total Mac Addresses", line): continue elif re.search(r"Multicast Entries", line): continue elif re.search(r"vlan.*mac.*address.*type.*", line): continue else: raise ValueError("Unexpected output from: {}".format(repr(line))) return mac_address_table def get_snmp_information(self): """ Returns a dict of dicts Example Output: { 'chassis_id': u'Asset Tag 54670', 'community': { u'private': { 'acl': u'12', 'mode': u'rw'}, u'public': { 'acl': u'11', 'mode': u'ro'}, u'public_named_acl': { 'acl': u'ALLOW-SNMP-ACL', 'mode': u'ro'}, u'public_no_acl': { 'acl': u'N/A', 'mode': u'ro'}}, 'contact': u'Joe Smith', 'location': u'123 Anytown USA Rack 404'} """ # default values snmp_dict = { 'chassis_id': u'unknown', 'community': {}, 'contact': u'unknown', 'location': u'unknown' } command = 'show run | include snmp-server' output = self._send_command(command) for line in output.splitlines(): fields = line.split() if 'snmp-server community' in line: name = fields[2] if 'community' not in snmp_dict.keys(): snmp_dict.update({'community': {}}) snmp_dict['community'].update({name: {}}) try: snmp_dict['community'][name].update({'mode': fields[3].lower()}) except IndexError: snmp_dict['community'][name].update({'mode': u'N/A'}) try: snmp_dict['community'][name].update({'acl': fields[4]}) except IndexError: snmp_dict['community'][name].update({'acl': u'N/A'}) elif 'snmp-server location' in line: snmp_dict['location'] = ' '.join(fields[2:]) elif 'snmp-server contact' in line: snmp_dict['contact'] = ' '.join(fields[2:]) elif 'snmp-server chassis-id' in line: snmp_dict['chassis_id'] = ' '.join(fields[2:]) # If SNMP Chassis wasn't found; obtain using direct command if snmp_dict['chassis_id'] == 'unknown': command = 'show snmp chassis' snmp_chassis = self._send_command(command) snmp_dict['chassis_id'] = snmp_chassis return snmp_dict def ping(self, destination, source=C.PING_SOURCE, ttl=C.PING_TTL, timeout=C.PING_TIMEOUT, size=C.PING_SIZE, count=C.PING_COUNT, vrf=C.PING_VRF): """ Execute ping on the device and returns a dictionary with the result. Output dictionary has one of following keys: * success * error In case of success, inner dictionary will have the followin keys: * probes_sent (int) * packet_loss (int) * rtt_min (float) * rtt_max (float) * rtt_avg (float) * rtt_stddev (float) * results (list) 'results' is a list of dictionaries with the following keys: * ip_address (str) * rtt (float) """ ping_dict = {} # vrf needs to be right after the ping command if vrf: command = 'ping vrf {} {}'.format(vrf, destination) else: command = 'ping {}'.format(destination) command += ' timeout {}'.format(timeout) command += ' size {}'.format(size) command += ' repeat {}'.format(count) if source != '': command += ' source {}'.format(source) output = self._send_command(command) if '%' in output: ping_dict['error'] = output elif 'Sending' in output: ping_dict['success'] = { 'probes_sent': 0, 'probes_sent': 0, 'packet_loss': 0, 'rtt_min': 0.0, 'rtt_max': 0.0, 'rtt_avg': 0.0, 'rtt_stddev': 0.0, 'results': [] } for line in output.splitlines(): fields = line.split() if 'Success rate is 0' in line: sent_and_received = re.search(r'\((\d*)/(\d*)\)', fields[5]) probes_sent = int(sent_and_received.groups()[0]) probes_received = int(sent_and_received.groups()[1]) ping_dict['success']['probes_sent'] = probes_sent ping_dict['success']['packet_loss'] = probes_sent - probes_received elif 'Success rate is' in line: sent_and_received = re.search(r'\((\d*)/(\d*)\)', fields[5]) probes_sent = int(sent_and_received.groups()[0]) probes_received = int(sent_and_received.groups()[1]) min_avg_max = re.search(r'(\d*)/(\d*)/(\d*)', fields[9]) ping_dict['success']['probes_sent'] = probes_sent ping_dict['success']['packet_loss'] = probes_sent - probes_received ping_dict['success'].update({ 'rtt_min': float(min_avg_max.groups()[0]), 'rtt_avg': float(min_avg_max.groups()[1]), 'rtt_max': float(min_avg_max.groups()[2]), }) results_array = [] for i in range(probes_received): results_array.append({'ip_address': py23_compat.text_type(destination), 'rtt': 0.0}) ping_dict['success'].update({'results': results_array}) return ping_dict def traceroute(self, destination, source=C.TRACEROUTE_SOURCE, ttl=C.TRACEROUTE_TTL, timeout=C.TRACEROUTE_TIMEOUT, vrf=C.TRACEROUTE_VRF): """ Executes traceroute on the device and returns a dictionary with the result. :param destination: Host or IP Address of the destination :param source (optional): Use a specific IP Address to execute the traceroute :param ttl (optional): Maimum number of hops -> int (0-255) :param timeout (optional): Number of seconds to wait for response -> int (1-3600) Output dictionary has one of the following keys: * success * error In case of success, the keys of the dictionary represent the hop ID, while values are dictionaries containing the probes results: * rtt (float) * ip_address (str) * host_name (str) """ # vrf needs to be right after the traceroute command if vrf: command = "traceroute vrf {} {}".format(vrf, destination) else: command = "traceroute {}".format(destination) if source: command += " source {}".format(source) if ttl: if isinstance(ttl, int) and 0 <= timeout <= 255: command += " ttl 0 {}".format(str(ttl)) if timeout: # Timeout should be an integer between 1 and 3600 if isinstance(timeout, int) and 1 <= timeout <= 3600: command += " timeout {}".format(str(timeout)) # Calculation to leave enough time for traceroute to complete assumes send_command # delay of .2 seconds. max_loops = (5 * ttl * timeout) + 150 if max_loops < 500: # Make sure max_loops isn't set artificially low max_loops = 500 output = self.device.send_command(command, max_loops=max_loops) # Prepare return dict traceroute_dict = dict() if re.search('Unrecognized host or address', output): traceroute_dict['error'] = 'unknown host %s' % destination return traceroute_dict else: traceroute_dict['success'] = dict() results = dict() # Find all hops hops = re.findall('\\n\s+[0-9]{1,3}\s', output) for hop in hops: # Search for hop in the output hop_match = re.search(hop, output) # Find the start index for hop start_index = hop_match.start() # If this is last hop if hops.index(hop) + 1 == len(hops): # Set the stop index for hop to len of output stop_index = len(output) # else, find the start index for next hop else: next_hop_match = re.search(hops[hops.index(hop) + 1], output) stop_index = next_hop_match.start() # Now you have the start and stop index for each hop # and you can parse the probes # Set the hop_variable, and remove spaces between msec for easier matching hop_string = output[start_index:stop_index].replace(' msec', 'msec') hop_list = hop_string.split() current_hop = int(hop_list.pop(0)) # Prepare dictionary for each hop (assuming there are 3 probes in each hop) results[current_hop] = dict() results[current_hop]['probes'] = dict() results[current_hop]['probes'][1] = {'rtt': float(), 'ip_address': '', 'host_name': ''} results[current_hop]['probes'][2] = {'rtt': float(), 'ip_address': '', 'host_name': ''} results[current_hop]['probes'][3] = {'rtt': float(), 'ip_address': '', 'host_name': ''} current_probe = 1 ip_address = '' host_name = '' while hop_list: current_element = hop_list.pop(0) # If current_element is * move index in dictionary to next probe if current_element == '*': current_probe += 1 # If current_element contains msec record the entry for probe elif 'msec' in current_element: ip_address = py23_compat.text_type(ip_address) host_name = py23_compat.text_type(host_name) rtt = float(current_element.replace('msec', '')) results[current_hop]['probes'][current_probe]['ip_address'] = ip_address results[current_hop]['probes'][current_probe]['host_name'] = host_name results[current_hop]['probes'][current_probe]['rtt'] = rtt # After recording the entry move the index to next probe current_probe += 1 # If element contains '(' and ')', the output format is 'FQDN (IP_ADDRESS)' # Save the IP address elif '(' in current_element: ip_address = current_element.replace('(', '').replace(')', '') # Save the probe's ip_address and host_name else: host_name = current_element ip_address = current_element traceroute_dict['success'] = results return traceroute_dict def get_config(self, retrieve='all'): """Implementation of get_config for IOS. Returns the startup or/and running configuration as dictionary. The keys of the dictionary represent the type of configuration (startup or running). The candidate is always empty string, since IOS does not support candidate configuration. """ configs = { 'startup': '', 'running': '', 'candidate': '', } if retrieve in ('startup', 'all'): command = 'show startup-config' output = self._send_command(command) configs['startup'] = output if retrieve in ('running', 'all'): command = 'show running-config' output = self._send_command(command) configs['running'] = output return configs @property def dest_file_system(self): # The self.device check ensures napalm has an open connection if self.device and self._dest_file_system is None: self._dest_file_system = self._discover_file_system() return self._dest_file_system