Source code for hammers.osrest.neutron

import collections
import requests


[docs]def floatingip_delete(auth, floatingip): """Frees a floating IP by ID""" if isinstance(floatingip, collections.Mapping): floatingip = floatingip['id'] response = requests.delete( url=auth.endpoint('network') + '/v2.0/floatingips/{}'.format(floatingip), headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() return response # 204 No Content is normal.
[docs]def floatingips(auth): """Get all floating IPs, returns a dictionary keyed by ID.""" response = requests.get( url=auth.endpoint('network') + '/v2.0/floatingips', headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() fips = response.json()['floatingips'] return {fip['id']: fip for fip in fips}
[docs]def network(auth, net): """Gets a network by ID, or mapping containing an ``'id'`` key.""" if isinstance(net, collections.Mapping): net = net['id'] response = requests.get( url=auth.endpoint('network') + '/v2.0/networks/{}'.format(net), headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() return response.json()['network']
[docs]def networks(auth): """Get all networks. Returns dictionary keyed by ID.""" response = requests.get( url=auth.endpoint('network') + '/v2.0/networks', headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() nets = response.json()['networks'] return {net['id']: net for net in nets}
[docs]def port_delete(auth, port): """Deletes a port by ID, or mapping containing an ``'id'`` key.""" if isinstance(port, collections.Mapping): port = port['id'] response = requests.delete( url=auth.endpoint('network') + '/v2.0/ports/{}'.format(port), headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() return response
[docs]def ports(auth): """Get all ports. Returns a dictionary keyed by port ID.""" response = requests.get( url=auth.endpoint('network') + '/v2.0/ports', headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() data = response.json() return {n['id']: n for n in data['ports']}
[docs]def subnet(auth, subnet): """Get subnet info. Accepts ID or mapping containing an ``'id'`` key.""" if isinstance(subnet, collections.Mapping): subnet = subnet['id'] response = requests.get( url=auth.endpoint('network') + '/v2.0/subnets/{}'.format(subnet), headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() return response.json()['subnet']
[docs]def subnets(auth): """Get all subnets.""" response = requests.get( url=auth.endpoint('network') + '/v2.0/subnets', headers={'X-Auth-Token': auth.token}, ) response.raise_for_status() subnets = response.json()['subnets'] return {subnet['id']: subnet for subnet in subnets}
__all__ = [ 'neutron_port_delete', 'neutron_ports', ] neutron_port_delete = port_delete neutron_ports = ports