Ansible Dynamic Inventory with Forward Networks

  • 30 January 2024
  • 1 reply
  • 185 views

Userlevel 1
Badge

Forward Networks Dynamic Inventory

 

This discussion contains the attached file ansible-fn-inventory.py and a description on how each portion of the inventory works. This was developed for our organization to use with Ansible and AWX to perform automated STIG or non-standard configuration remediation.

 

The file ansible-fn-inventory.py is configured to query Forward Network’s most recent snapshot and is triggered by a configured webhook in forward networks once a snapshot completes successfully.

 

Get all devices

 

The get_devices function returns a json list of all the devices that were collected on during the most recent snapshot. Also included with each device are the model, manufacturer, os version, platform and managementIps from the collection. This function is called to add all of the devices into ansible so playbooks can be designed to make configuration changes to networking devices.

def get_devices() -> list:

"""Returns all devices found in the latest snapshot from fwd-networks. """

api_url = f"{HOST_URL}/api/networks/{NETWORK}/devices"
device_list = requests.get(api_url, headers=HEADERS).json()

return device_list

 

The primary function for automated remediation is the get_violations function which accepts a string query_id that is stored in a list (nqe_groups) of dictionary elements that contain a group_name, query ID, and optional function_name for updating host variables. The get_violations function can be seen below.

def get_violations(query: str) -> list:

"""Runs an NQE against the desired network ID.
Will check output to see if there is a 'violation' column.
If there isn't, it assumes every device is a violation"""

request_url = f'{HOST_URL}/api/nqe?networkId={NETWORK}'
json_formatted_query = json.dumps({'queryId': query})
nqe_results = requests.request('POST', request_url, data=json_formatted_query, headers=HEADERS).json()['items']

violations = []
for line in nqe_results:
if 'violation' in line.keys():
if line['violation']:
violations.append(line)
else:
violations.append(line)
return violations

 

Hostvars

 

The next section in the inventory script is used for NQE hostvars functions.

These functions can be paired with an NQE to add or update a hostvars dictionary for a given device.

Each function will be expected to take the current hostvars for a given device along with a violation, and be able to return a new version of the hostvars dictionary containing any new data.

The main code loop that runs below will simply use these functions to replace the hostvars dictionary for any violating devices. A sample function below uses data from an NQE to update the hostvars for an object to include a list of greTunnels (IF_TUNNEL_GRE4, IF_TUNNEL_IPSEC) that are configured on network device that is also configured as an eigrp stub connected device.

Hostvars functions can be written to accept a wide range of data returned from an NQE so you can use it to make complex change management decisions.

def stub_gre_tun(current_hostvars: dict, violation: dict) -> dict:

"""Updates hostvars with all of the configured GRE tunnels on the device"""

config_lines = violation['greTunnels']
current_hostvars.setdefault('stub_gre_tun', {}).update({'tunnels': config_lines})
return current_hostvars

The data returned from the above hostvars function was used in the following ansible playbook snippet

    - name: Add FILTER_MAP to EIGRP devices
ios_config:
lines:
- 'distribute-list route-map FILTER_MAP out {{ item }}'
parents:
- 'router eigrp Your_Network'
- 'address-family ipv4 unicast autonomous-system YOUR_AS'
- 'topology base'
loop: "{{ stub_gre_tun.tunnels }}"
when: "'desired_groupname' in group_names"

 

The same can be done with device interfaces 

getInterfaceConfig(interface) =
foreach line in interface.line.children
select line.text;

foreach device in network.devices
foreach interface
in patternMatches(device.files.config, `interface {name:string}`)
foreach trunkMatch
in patternMatches(interface.line.children, `switchport mode trunk`)
let ifaceMatch = findInterface(device, interface.data.name)
where isPresent(ifaceMatch) && ifaceMatch.interface.operStatus == OperStatus.UP
let lines = getInterfaceConfig(interface)
where "switchport nonegotiate" not in lines
group interface.data.name as interface
by { name: device.name, ip: device.platform.managementIps } as device
select {
violation: isPresent(device.name),
device: device.name,
mgmtIP: device.ip,
interface: interface
}

The above NQE returns a list of interfaces on each device that are active trunk links and are missing switchport nonegotiate 

def trunk_nonegotiate(current_hostvars: dict, violation: dict) -> dict:
"""This function updates a device in hostvars with violating interfaces when interfaces are returned as a list."""

interfaces = violation['interface']
current_hostvars.setdefault('trunk_nonegotiate', interfaces)
return current_hostvars

The following playbook can then call on the device group and iterate over the interfaces to apply the missing config

- name: trunk nonegotiate
gather_facts: false
hosts: nonegotiate
tasks:

- name: Configure trunk nonegotiate
ios_config:
save_when: modified
lines:
- 'switchport nonegotiate'
parents: 'interface {{ item }}'
with_items: "{{ trunk_interfaces }}"

 

NQE Groups

 

The next section is where you specify which NQEs you'd like to make groups for.

New dictionary entries need to be added to the nqe_groups list.

You need to add the name of the group you want to create, the query ID from the NQE in forward networks, and a hostvars function which can be set to NONE if not required.

nqe_groups = [
{ 'group_name': 'portSecurity', 'query_id': 'Q_bc8aa21fa9aad13d56c6958394ec0e7f5afc3e37', 'hostvars_function': interfaceConfig },
{ 'group_name': 'nonegotiate', 'query_id': 'Q_a3a61d51ce7abcf4142e193bd764eb96b6375e58', 'hostvars_function': trunk_nonegotiate },
{ 'group_name': 'eigrp_stub_connected', 'query_id': 'Q_4744ea529edddb1d9d2e44487da3a102c270f172', 'hostvars_function': stub_gre_tun},
]

 

The final section of the inventory script includes the main loop which is responsible for sorting the returned data into groups that ansible can use as an inventory source.

if __name__ == '__main__':

"""
Phases:

1. Declare Constants
2. Create empty hostvars and groups variables
3. Get all devices from forward networks
4. Add each device to hostvars
8. Create groups based on specified NQEs and their violating devices

"""

# Create empty hostvars and groups
hostvars = {"_meta": {"hostvars": {}}}
groups = {
'all': {
'vars': {
"ansible_become": "yes",
"ansible_become_method":
"enable",
"ansible_connection":
"network_cli"
}
},
'errors': {
'vars': {}
}
}
groups['all']['hosts'] = []

# Return list of devices
devices_list = get_devices()

# Main Loop

for device in devices_list:
name = device['name']
groups['all']['hosts'].append(name)


# Add the device to hostvars

hostvars['_meta']['hostvars'][device['name']] = {
'ansible_host' : device['managementIps'][0],
'manufacturer': device.get('manufacturer'),
'model': device.get('model'),
'platform': device.get('platform'),
'network_os_version': device.get('osVersion'),
}

# Create a group for each NQE and its associated failed devices
# Create new hostvars dictionary if hostvars function is present

for nqe_group in nqe_groups:

try:
group_name = nqe_group['group_name']
query_id = nqe_group['query_id']
violations = get_violations(query_id)

# Use set comprehension to create a list of unique device names.
# This way an NQE can provide multiple lines with the same device name
# and the device won't be added to the group more than once
hosts = list({violation['device'] for violation in violations})

groups[group_name] = {
'hosts': hosts
}

if nqe_group['hostvars_function']:
func = nqe_group['hostvars_function']
for violation in violations:
current_hostvars = hostvars['_meta']['hostvars'][violation['device']]
new_hostvars = func(current_hostvars, violation)
hostvars['_meta']['hostvars'][violation['device']].update(new_hostvars)

except Exception as e:
if hasattr(e, 'message'):
groups['errors']['vars'].setdefault(group_name, []).append(e.message)
else:
groups['errors']['vars'].setdefault(group_name, []).append(repr(e))


inventory = {}
inventory.update(hostvars)
inventory.update(groups)


print(json.dumps(inventory, indent=4, sort_keys=True))

 


1 reply

Userlevel 2
Badge

@djhoward12 This is fantastic! We will likely tweak it and use it similarly.

Reply