astakos_... --> admin_...
e.g.,
astakos_services --> admin_service, astakos_commission --> admin_commission
+4. Implement OpenStack Network API 2.0, with synnefo/cyclades extentions. New:
+ network info/list/create/delete/set
+ subnet info/list/create/set
+ port info/list/create/delete/set
id_suff=ValueArgument(
'filter by id suffix (case insensitive)', '--id-suffix'),
id_like=ValueArgument(
- 'print only if id contains this (case insensitive)',
- '--id-like')
+ 'print only if id contains this (case insensitive)', '--id-like')
)
def _non_exact_id_filter(self, items):
'* get a list of network ids: /network list',
'* details of network: /network info <network id>']
+ net_types = ('CUSTOM', 'MAC_FILTERED', 'IP_LESS_ROUTED', 'PHYSICAL_VLAN')
+
@classmethod
def connection(this, foo):
return generic._connection(foo)
return _raise
@classmethod
+ def network_type(this, foo):
+ def _raise(self, *args, **kwargs):
+ network_type = kwargs.get('network_type', None)
+ msg = 'Invalid network type %s.\nValid types: %s' % (
+ network_type, ' '.join(this.net_types))
+ assert network_type in this.net_types, msg
+ return foo(self, *args, **kwargs)
+ return _raise
+
+ @classmethod
def network_max(this, foo):
def _raise(self, *args, **kwargs):
try:
--- /dev/null
+# Copyright 2011-2013 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from io import StringIO
+from pydoc import pager
+
+from kamaki.cli import command
+from kamaki.cli.command_tree import CommandTree
+from kamaki.cli.errors import (
+ CLISyntaxError, CLIBaseUrlError, CLIInvalidArgument)
+from kamaki.clients.cyclades import CycladesNetworkClient
+from kamaki.cli.argument import FlagArgument, ValueArgument, RepeatableArgument
+from kamaki.cli.commands import _command_init, errors, addLogSettings
+from kamaki.cli.commands import (
+ _optional_output_cmd, _optional_json, _name_filter, _id_filter)
+from kamaki.cli.utils import filter_dicts_by_dict
+
+
+network_cmds = CommandTree('network', 'Networking API network commands')
+port_cmds = CommandTree('port', 'Networking API network commands')
+subnet_cmds = CommandTree('subnet', 'Networking API network commands')
+_commands = [network_cmds, port_cmds, subnet_cmds]
+
+
+about_authentication = '\nUser Authentication:\
+ \n* to check authentication: /user authenticate\
+ \n* to set authentication token: /config set cloud.<cloud>.token <token>'
+
+
+class _init_network(_command_init):
+ @errors.generic.all
+ @addLogSettings
+ def _run(self, service='network'):
+ if getattr(self, 'cloud', None):
+ base_url = self._custom_url(service) or self._custom_url(
+ 'compute')
+ if base_url:
+ token = self._custom_token(service) or self._custom_token(
+ 'compute') or self.config.get_cloud('token')
+ self.client = CycladesNetworkClient(
+ base_url=base_url, token=token)
+ return
+ else:
+ self.cloud = 'default'
+ if getattr(self, 'auth_base', False):
+ cyclades_endpoints = self.auth_base.get_service_endpoints(
+ self._custom_type('compute') or 'compute',
+ self._custom_version('compute') or '')
+ base_url = cyclades_endpoints['publicURL']
+ token = self.auth_base.token
+ self.client = CycladesNetworkClient(base_url=base_url, token=token)
+ else:
+ raise CLIBaseUrlError(service='network')
+
+ def main(self):
+ self._run()
+
+
+@command(network_cmds)
+class network_list(_init_network, _optional_json, _name_filter, _id_filter):
+ """List networks
+ Use filtering arguments (e.g., --name-like) to manage long server lists
+ """
+
+ arguments = dict(
+ detail=FlagArgument('show detailed output', ('-l', '--details')),
+ more=FlagArgument(
+ 'output results in pages (-n to set items per page, default 10)',
+ '--more'),
+ user_id=ValueArgument(
+ 'show only networks belonging to user with this id', '--user-id')
+ )
+
+ def _filter_by_user_id(self, nets):
+ return filter_dicts_by_dict(nets, dict(user_id=self['user_id'])) if (
+ self['user_id']) else nets
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self):
+ detail = self['detail'] or self['user_id']
+ nets = self.client.list_networks(detail=detail)
+ nets = self._filter_by_user_id(nets)
+ nets = self._filter_by_name(nets)
+ nets = self._filter_by_id(nets)
+ if detail and not self['detail']:
+ nets = [dict(
+ id=n['id'], name=n['name'], links=n['links']) for n in nets]
+ kwargs = dict()
+ if self['more']:
+ kwargs['out'] = StringIO()
+ kwargs['title'] = ()
+ self._print(nets, **kwargs)
+ if self['more']:
+ pager(kwargs['out'].getvalue())
+
+ def main(self):
+ super(self.__class__, self)._run()
+ self._run()
+
+
+@command(network_cmds)
+class network_info(_init_network, _optional_json):
+ """Get details about a network"""
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ @errors.cyclades.network_id
+ def _run(self, network_id):
+ net = self.client.get_network_details(network_id)
+ self._print(net, self.print_dict)
+
+ def main(self, network_id):
+ super(self.__class__, self)._run()
+ self._run(network_id=network_id)
+
+
+@command(network_cmds)
+class network_create(_init_network, _optional_json):
+ """Create a new network
+ Valid network types: CUSTOM MAC_FILTERED IP_LESS_ROUTED PHYSICAL_VLAN
+ """
+
+ arguments = dict(
+ name=ValueArgument('Network name', '--name'),
+ shared=FlagArgument(
+ 'Make network shared (special privileges required)', '--shared')
+ )
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ @errors.cyclades.network_type
+ def _run(self, network_type):
+ net = self.client.create_network(
+ network_type, name=self['name'], shared=self['shared'])
+ self._print(net, self.print_dict)
+
+ def main(self, network_type):
+ super(self.__class__, self)._run()
+ self._run(network_type=network_type)
+
+
+@command(network_cmds)
+class network_delete(_init_network, _optional_output_cmd):
+ """Delete a network"""
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ @errors.cyclades.network_id
+ def _run(self, network_id):
+ r = self.client.delete_network(network_id)
+ self._optional_output(r)
+
+ def main(self, network_id):
+ super(self.__class__, self)._run()
+ self._run(network_id=network_id)
+
+
+@command(network_cmds)
+class network_set(_init_network, _optional_json):
+ """Set an attribute of a network, leave the rest untouched (update)
+ Only "--name" is supported for now
+ """
+
+ arguments = dict(name=ValueArgument('New name of the network', '--name'))
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ @errors.cyclades.network_id
+ def _run(self, network_id):
+ if self['name'] in (None, ):
+ raise CLISyntaxError(
+ 'Missing network attributes to update',
+ details=[
+ 'At least one if the following is expected:',
+ ' --name=<new name>'])
+ r = self.client.update_network(network_id, name=self['name'])
+ self._print(r, self.print_dict)
+
+ def main(self, network_id):
+ super(self.__class__, self)._run()
+ self._run(network_id=network_id)
+
+
+@command(subnet_cmds)
+class subnet_list(_init_network, _optional_json, _name_filter, _id_filter):
+ """List subnets
+ Use filtering arguments (e.g., --name-like) to manage long server lists
+ """
+
+ arguments = dict(
+ detail=FlagArgument('show detailed output', ('-l', '--details')),
+ more=FlagArgument(
+ 'output results in pages (-n to set items per page, default 10)',
+ '--more'),
+ user_id=ValueArgument(
+ 'show only subnets belonging to user with this id', '--user-id')
+ )
+
+ def _filter_by_user_id(self, nets):
+ return filter_dicts_by_dict(nets, dict(user_id=self['user_id'])) if (
+ self['user_id']) else nets
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self):
+ detail = self['detail'] or self['user_id']
+ nets = self.client.list_subnets()
+ nets = self._filter_by_user_id(nets)
+ nets = self._filter_by_name(nets)
+ nets = self._filter_by_id(nets)
+ if detail and not self['detail']:
+ nets = [dict(
+ id=n['id'], name=n['name'], links=n['links']) for n in nets]
+ kwargs = dict()
+ if self['more']:
+ kwargs['out'] = StringIO()
+ kwargs['title'] = ()
+ self._print(nets, **kwargs)
+ if self['more']:
+ pager(kwargs['out'].getvalue())
+
+ def main(self):
+ super(self.__class__, self)._run()
+ self._run()
+
+
+@command(subnet_cmds)
+class subnet_info(_init_network, _optional_json):
+ """Get details about a subnet"""
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self, subnet_id):
+ net = self.client.get_subnet_details(subnet_id)
+ self._print(net, self.print_dict)
+
+ def main(self, subnet_id):
+ super(self.__class__, self)._run()
+ self._run(subnet_id=subnet_id)
+
+
+class AllocationPoolArgument(RepeatableArgument):
+
+ @property
+ def value(self):
+ return super(AllocationPoolArgument, self).value or []
+
+ @value.setter
+ def value(self, new_pools):
+ new_list = []
+ for pool in new_pools:
+ start, comma, end = pool.partition(',')
+ if not (start and comma and end):
+ raise CLIInvalidArgument(
+ 'Invalid allocation pool argument %s' % pool, details=[
+ 'Allocation values must be of the form:',
+ ' <start address>,<end address>'])
+ new_list.append(dict(start=start, end=end))
+ self._value = new_list
+
+
+@command(subnet_cmds)
+class subnet_create(_init_network, _optional_json):
+ """Create a new subnet
+ """
+
+ arguments = dict(
+ name=ValueArgument('Subnet name', '--name'),
+ allocation_pools=AllocationPoolArgument(
+ 'start_address,end_address of allocation pool (can be repeated)'
+ ' e.g., --alloc-pool=123.45.67.1,123.45.67.8',
+ '--alloc-pool'),
+ gateway=ValueArgument('Gateway IP', '--gateway'),
+ subnet_id=ValueArgument('The id for the subnet', '--id'),
+ ipv6=FlagArgument('If set, IP version is set to 6, else 4', '--ipv6'),
+ enable_dhcp=FlagArgument('Enable dhcp (default: off)', '--with-dhcp')
+ )
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ @errors.cyclades.network_id
+ def _run(self, network_id, cidr):
+ net = self.client.create_subnet(
+ network_id, cidr,
+ self['name'], self['allocation_pools'], self['gateway'],
+ self['subnet_id'], self['ipv6'], self['enable_dhcp'])
+ self._print(net, self.print_dict)
+
+ def main(self, network_id, cidr):
+ super(self.__class__, self)._run()
+ self._run(network_id=network_id, cidr=cidr)
+
+
+# @command(subnet_cmds)
+# class subnet_delete(_init_network, _optional_output_cmd):
+# """Delete a subnet"""
+
+# @errors.generic.all
+# @errors.cyclades.connection
+# def _run(self, subnet_id):
+# r = self.client.delete_subnet(subnet_id)
+# self._optional_output(r)
+
+# def main(self, subnet_id):
+# super(self.__class__, self)._run()
+# self._run(subnet_id=subnet_id)
+
+
+@command(subnet_cmds)
+class subnet_set(_init_network, _optional_json):
+ """Set an attribute of a subnet, leave the rest untouched (update)
+ Only "--name" is supported for now
+ """
+
+ arguments = dict(name=ValueArgument('New name of the subnet', '--name'))
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self, subnet_id):
+ if self['name'] in (None, ):
+ raise CLISyntaxError(
+ 'Missing subnet attributes to update',
+ details=[
+ 'At least one if the following is expected:',
+ ' --name=<new name>'])
+ r = self.client.get_subnet_details(subnet_id)
+ r = self.client.update_subnet(
+ subnet_id, r['network_id'], name=self['name'])
+ self._print(r, self.print_dict)
+
+ def main(self, subnet_id):
+ super(self.__class__, self)._run()
+ self._run(subnet_id=subnet_id)
+
+
+@command(port_cmds)
+class port_list(_init_network, _optional_json):
+ """List all ports"""
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self):
+ net = self.client.list_ports()
+ self._print(net)
+
+ def main(self):
+ super(self.__class__, self)._run()
+ self._run()
+
+
+@command(port_cmds)
+class port_info(_init_network, _optional_json):
+ """Get details about a port"""
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self, port_id):
+ net = self.client.get_port_details(port_id)
+ self._print(net, self.print_dict)
+
+ def main(self, port_id):
+ super(self.__class__, self)._run()
+ self._run(port_id=port_id)
+
+
+@command(port_cmds)
+class port_delete(_init_network, _optional_output_cmd):
+ """Delete a port"""
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self, port_id):
+ r = self.client.delete_port(port_id)
+ self._optional_output(r)
+
+ def main(self, port_id):
+ super(self.__class__, self)._run()
+ self._run(port_id=port_id)
+
+
+@command(port_cmds)
+class port_set(_init_network, _optional_json):
+ """Set an attribute of a port, leave the rest untouched (update)
+ Only "--name" is supported for now
+ """
+
+ arguments = dict(name=ValueArgument('New name of the port', '--name'))
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ def _run(self, port_id):
+ if self['name'] in (None, ):
+ raise CLISyntaxError(
+ 'Missing port attributes to update',
+ details=[
+ 'At least one if the following is expected:',
+ ' --name=<new name>'])
+ r = self.client.get_port_details(port_id)
+ r = self.client.update_port(
+ port_id, r['network_id'], name=self['name'])
+ self._print(r, self.print_dict)
+
+ def main(self, port_id):
+ super(self.__class__, self)._run()
+ self._run(port_id=port_id)
+
+
+@command(port_cmds)
+class port_create(_init_network, _optional_json):
+ """Create a new port"""
+
+ arguments = dict(
+ name=ValueArgument('A human readable name', '--name'),
+ security_group_id=RepeatableArgument(
+ 'Add a security group id (can be repeated)',
+ ('-g', '--security-group')),
+ subnet_id=ValueArgument(
+ 'Subnet id for fixed ips (used with --ip-address)',
+ '--subnet-id'),
+ ip_address=ValueArgument(
+ 'IP address for subnet id (used with --subnet-id', '--ip-address')
+ )
+
+ @errors.generic.all
+ @errors.cyclades.connection
+ @errors.cyclades.network_id
+ def _run(self, network_id, device_id):
+ if not (bool(self['subnet_id']) ^ bool(self['ip_address'])):
+ raise CLIInvalidArgument('Invalid use of arguments', details=[
+ '--subned-id and --ip-address should be used together'])
+ fixed_ips = dict(
+ subnet_id=self['subnet_id'], ip_address=self['ip_address']) if (
+ self['subnet_id']) else None
+ r = self.client.create_port(
+ network_id, device_id,
+ name=self['name'],
+ security_groups=self['security_group_id'],
+ fixed_ips=fixed_ips)
+ self._print(r, self.print_dict)
+
+ def main(self, network_id, device_id):
+ super(self.__class__, self)._run()
+ self._run(network_id=network_id, device_id=device_id)
'file_cli': 'pithos',
'server_cli': 'cyclades',
'flavor_cli': 'cyclades',
- 'network_cli': 'cyclades',
+ 'network_cli': 'network',
+ 'subnet_cli': 'network',
+ 'port_cli': 'network',
'ip_cli': 'cyclades',
'image_cli': 'image',
'config_cli': 'config',
from time import sleep
from kamaki.clients.cyclades.rest_api import CycladesRestClient
+from kamaki.clients.network import NetworkClient
+from kamaki.clients.utils import path4url
from kamaki.clients import ClientError
req = dict(removeFloatingIp=dict(address=address))
r = self.servers_action_post(server_id, json_data=req)
return r.headers
+
+
+class CycladesNetworkClient(NetworkClient):
+ """Cyclades Network API extentions"""
+
+ network_types = (
+ 'CUSTOM', 'MAC_FILTERED', 'IP_LESS_ROUTED', 'PHYSICAL_VLAN')
+
+ def list_networks(self, detail=None):
+ path = path4url('networks', 'detail' if detail else '')
+ r = self.get(path, success=200)
+ return r.json['networks']
+
+ def create_network(self, type, name=None, shared=None):
+ req = dict(network=dict(type=type, admin_state_up=True))
+ if name:
+ req['network']['name'] = name
+ if shared not in (None, ):
+ req['network']['shared'] = bool(shared)
+ r = self.networks_post(json_data=req, success=201)
+ return r.json['network']
+
+ def create_port(
+ self, network_id, device_id,
+ security_groups=None, name=None, fixed_ips=None):
+ port = dict(network_id=network_id, device_id=device_id)
+ if security_groups:
+ port['security_groups'] = security_groups
+ if name:
+ port['name'] = name
+ if fixed_ips:
+ diff = set(['subnet_id', 'ip_address']).difference(fixed_ips)
+ if diff:
+ raise ValueError(
+ 'Invalid format for "fixed_ips", %s missing' % diff)
+ port['fixed_ips'] = fixed_ips
+ r = self.ports_post(json_data=dict(port=port), success=201)
+ return r.json['port']
'/os-floating-ips/%s' % fip, success=success, **kwargs))
+class CycladesNetworkClient(TestCase):
+
+ """Set up a ComputesRest thorough test"""
+ def setUp(self):
+ self.url = 'http://network.example.com'
+ self.token = 'n2tw0rk70k3n'
+ self.client = cyclades.CycladesNetworkClient(self.url, self.token)
+
+ def tearDown(self):
+ FR.json = vm_recv
+ del self.client
+
+ @patch('kamaki.clients.Client.get', return_value=FR)
+ def test_list_networks(self, get):
+ FR.json = dict(networks='ret val')
+ for detail in (True, None):
+ self.assertEqual(self.client.list_networks(detail), 'ret val')
+ path = '/networks/detail' if detail else '/networks'
+ self.assertEqual(get.mock_calls[-1], call(path, success=200))
+
+ @patch(
+ 'kamaki.clients.network.rest_api.NetworkRestClient.networks_post',
+ return_value=FR())
+ def test_create_network(self, networks_post):
+ for name, shared in product((None, 'net name'), (None, True)):
+ FR.json = dict(network='ret val')
+ type = 'net type'
+ self.assertEqual(
+ self.client.create_network(type, name=name, shared=shared),
+ 'ret val')
+ req = dict(type=type, admin_state_up=True)
+ if name:
+ req['name'] = name
+ if shared:
+ req['shared'] = shared
+ expargs = dict(json_data=dict(network=req), success=201)
+ self.assertEqual(networks_post.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.rest_api.NetworkRestClient.ports_post',
+ return_value=FR)
+ def test_create_port(self, ports_post):
+ network_id, device_id, FR.json = 'netid', 'devid', dict(port='ret v')
+ for name, sec_grp, fixed_ips in product(
+ ('port name', None),
+ ([1, 2, 3], None),
+ (
+ dict(subnet_id='sid', ip_address='ipa'),
+ dict(subnet_id='sid'), dict(ip_address='ipa'),
+ None)):
+
+ if fixed_ips:
+ diff = set(['subnet_id', 'ip_address']).difference(fixed_ips)
+ if diff:
+ self.assertRaises(
+ ValueError, self.client.create_port,
+ network_id, device_id,
+ name=name,
+ security_groups=sec_grp,
+ fixed_ips=fixed_ips)
+ continue
+ self.assertEqual(
+ self.client.create_port(
+ network_id, device_id,
+ name=name, security_groups=sec_grp, fixed_ips=fixed_ips),
+ 'ret v')
+ req = dict(network_id=network_id, device_id=device_id)
+ if sec_grp:
+ req['security_groups'] = sec_grp
+ if name:
+ req['name'] = name
+ if fixed_ips:
+ req['fixed_ips'] = fixed_ips
+ expargs = dict(json_data=dict(port=req), success=201)
+ self.assertEqual(ports_post.mock_calls[-1], call(**expargs))
+
+
class CycladesClient(TestCase):
def assert_dicts_are_equal(self, d1, d2):
not_found = True
if not argv[1:] or argv[1] == 'CycladesClient':
not_found = False
- runTestCase(CycladesClient, 'Cyclades Client', argv[2:])
+ runTestCase(CycladesNetworkClient, 'Cyclades Client', argv[2:])
+ if not argv[1:] or argv[1] == 'CycladesNetworkClient':
+ not_found = False
+ runTestCase(CycladesNetworkClient, 'CycladesNetwork Client', argv[2:])
if not argv[1:] or argv[1] == 'CycladesRestClient':
not_found = False
runTestCase(CycladesRestClient, 'CycladesRest Client', argv[2:])
--- /dev/null
+# Copyright 2013 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from kamaki.clients import ClientError
+from kamaki.clients.network.rest_api import NetworkRestClient
+
+
+class NetworkClient(NetworkRestClient):
+ """OpenStack Network API 2.0 client"""
+
+ def list_networks(self):
+ r = self.networks_get(success=200)
+ return r.json['networks']
+
+ def create_network(self, name, admin_state_up=None, shared=None):
+ req = dict(network=dict(
+ name=name, admin_state_up=bool(admin_state_up)))
+ if shared not in (None, ):
+ req['network']['shared'] = bool(shared)
+ r = self.networks_post(json_data=req, success=201)
+ return r.json['network']
+
+ def create_networks(self, networks):
+ """Atomic operation for batch network creation (all or nothing)
+ :param networks: (list) [
+ {name: ..(str).., admin_state_up: ..(bool).., shared: ..(bool)..},
+ {name: ..(str).., admin_state_up: ..(bool).., shared: ..(bool)..}]
+ name is mandatory, the rest is optional
+ e.g., create_networks([
+ {name: 'net1', admin_state_up: True},
+ {name: 'net2'}])
+ :returns: (list of dicts) created networks details
+ :raises ValueError: if networks is misformated
+ :raises ClientError: if the request failed or didn't return 201
+ """
+ try:
+ msg = 'The networks parameter must be list or tuple'
+ assert (
+ isinstance(networks, list) or isinstance(networks, tuple)), msg
+ for network in networks:
+ msg = 'Network specification %s is not a dict' % network
+ assert isinstance(network, dict), msg
+ err = set(network).difference(
+ ('name', 'admin_state_up', 'shared'))
+ if err:
+ raise ValueError(
+ 'Invalid key(s): %s in network specification %s' % (
+ err, network))
+ msg = 'Name is missing in network specification: %s' % network
+ assert network.get('name', None), msg
+ network.setdefault('admin_state_up', False)
+ except AssertionError as ae:
+ raise ValueError('%s' % ae)
+
+ req = dict(networks=list(networks))
+ r = self.networks_post(json_data=req, success=201)
+ return r.json['networks']
+
+ def get_network_details(self, network_id):
+ r = self.networks_get(network_id, success=200)
+ return r.json['network']
+
+ def update_network(
+ self, network_id, name=None, admin_state_up=None, shared=None):
+ network = dict()
+ if name:
+ network['name'] = name
+ if admin_state_up not in (None, ):
+ network['admin_state_up'] = admin_state_up
+ if shared not in (None, ):
+ network['shared'] = shared
+ network = dict(network=network)
+ r = self.networks_put(network_id, json_data=network, success=200)
+ return r.json['network']
+
+ def delete_network(self, network_id):
+ r = self.networks_delete(network_id, success=204)
+ return r.headers
+
+ def list_subnets(self):
+ r = self.subnets_get(success=200)
+ return r.json['subnets']
+
+ def create_subnet(
+ self, network_id, cidr,
+ name=None, allocation_pools=None, gateway_ip=None, subnet_id=None,
+ ipv6=None, enable_dhcp=None):
+ """
+ :param network_id: (str)
+ :param cidr: (str)
+
+ :param name: (str) The subnet name
+ :param allocation_pools: (list of dicts) start/end addresses of
+ allocation pools: [{'start': ..., 'end': ...}, ...]
+ :param gateway_ip: (str)
+ :param subnet_id: (str)
+ :param ipv6: (bool) ip_version == 6 if true else 4 (default)
+ :param enable_dhcp: (bool)
+ """
+ subnet = dict(
+ network_id=network_id, cidr=cidr, ip_version=6 if ipv6 else 4)
+ if name:
+ subnet['name'] = name
+ if allocation_pools:
+ subnet['allocation_pools'] = allocation_pools
+ if gateway_ip:
+ subnet['gateway_ip'] = gateway_ip
+ if subnet_id:
+ subnet['id'] = subnet_id
+ if enable_dhcp not in (None, ):
+ subnet['enable_dhcp'] = bool(enable_dhcp)
+ r = self.subnets_post(json_data=dict(subnet=subnet), success=201)
+ return r.json['subnet']
+
+ def create_subnets(self, subnets):
+ """Atomic operation for batch subnet creation (all or nothing)
+ :param subnets: (list of dicts) {key: ...} with all parameters in the
+ method create_subnet, where method mandatory / optional paramteres
+ respond to mandatory / optional paramters in subnets items
+ :returns: (list of dicts) created subnetss details
+ :raises ValueError: if subnets parameter is incorrectly formated
+ :raises ClientError: if the request failed or didn't return 201
+ """
+ try:
+ msg = 'The subnets parameter must be list or tuple'
+ assert (
+ isinstance(subnets, list) or isinstance(subnets, tuple)), msg
+ for subnet in subnets:
+ msg = 'Subnet specification %s is not a dict' % subnet
+ assert isinstance(subnet, dict), msg
+ err = set(subnet).difference((
+ 'network_id', 'cidr', 'name', 'allocation_pools',
+ 'gateway_ip', 'subnet_id', 'ipv6', 'enable_dhcp'))
+ if err:
+ raise ValueError(
+ 'Invalid key(s): %s in subnet specification %s' % (
+ err, subnet))
+ msg = 'network_id is missing in subnet spec: %s' % subnet
+ assert subnet.get('network_id', None), msg
+ msg = 'cidr is missing in subnet spec: %s' % subnet
+ assert subnet.get('cidr', None), msg
+ subnet['ip_version'] = 6 if subnet.pop('ipv6', None) else 4
+ if 'subnet_id' in subnet:
+ subnet['id'] = subnet.pop('subnet_id')
+ except AssertionError as ae:
+ raise ValueError('%s' % ae)
+
+ r = self.subnets_post(
+ json_data=dict(subnets=list(subnets)), success=201)
+ return r.json['subnets']
+
+ def get_subnet_details(self, subnet_id):
+ r = self.subnets_get(subnet_id, success=201)
+ return r.json
+
+ def update_subnet(
+ self, network_id, cidr,
+ name=None, allocation_pools=None, gateway_ip=None, subnet_id=None,
+ ipv6=None, enable_dhcp=None):
+ """
+ :param network_id: (str) used as filter
+ :param cidr: (str) used as filter
+
+ :param name: (str) The subnet name
+ :param allocation_pools: (list of dicts) start/end addresses of
+ allocation pools: [{'start': ..., 'end': ...}, ...]
+ :param gateway_ip: (str)
+ :param subnet_id: (str)
+ :param ipv6: (bool) ip_version == 6 if true, 4 if false, used as filter
+ :param enable_dhcp: (bool)
+ """
+ subnet = dict(network_id=network_id, cidr=cidr)
+ if name not in (None, ):
+ subnet['name'] = name
+ if allocation_pools not in (None, ):
+ subnet['allocation_pools'] = allocation_pools
+ if gateway_ip not in (None, ):
+ subnet['gateway_ip'] = gateway_ip
+ if subnet_id not in (None, ):
+ subnet['id'] = subnet_id
+ if ipv6 not in (None, ):
+ subnet['ip_version'] = 6 if ipv6 else 4
+ if enable_dhcp not in (None, ):
+ subnet['enable_dhcp'] = enable_dhcp
+ r = self.subnets_put(json_data=dict(subnet=subnet), success=201)
+ return r.json['subnet']
+
+ def delete_subnet(self, subnet_id):
+ r = self.subnets_delete(subnet_id, success=204)
+ return r.headers
+
+ def list_ports(self):
+ r = self.ports_get(success=200)
+ return r.json['ports']
+
+ def create_port(
+ self, network_id,
+ name=None, status=None, admin_state_up=None, mac_address=None,
+ fixed_ips=None, security_groups=None):
+ """
+ :param network_id: (str)
+
+ :param name: (str)
+ :param status: (str)
+ :param admin_state_up: (bool) Router administrative status (UP / DOWN)
+ :param mac_address: (str)
+ :param fixed_ips: (str)
+ :param security_groups: (list)
+ """
+ port = dict(network_id=network_id)
+ if name:
+ port['name'] = name
+ if status:
+ port['status'] = status
+ if admin_state_up not in (None, ):
+ port['admin_state_up'] = bool(admin_state_up)
+ if mac_address:
+ port['mac_address'] = mac_address
+ if fixed_ips:
+ port['fixed_ips'] = fixed_ips
+ if security_groups:
+ port['security_groups'] = security_groups
+ r = self.ports_post(json_data=dict(port=port), success=201)
+ return r.json['port']
+
+ def create_ports(self, ports):
+ """Atomic operation for batch port creation (all or nothing)
+ :param ports: (list of dicts) {key: ...} with all parameters in the
+ method create_port, where method mandatory / optional paramteres
+ respond to mandatory / optional paramters in ports items
+ :returns: (list of dicts) created portss details
+ :raises ValueError: if ports parameter is incorrectly formated
+ :raises ClientError: if the request failed or didn't return 201
+ """
+ try:
+ msg = 'The ports parameter must be list or tuple'
+ assert (
+ isinstance(ports, list) or isinstance(ports, tuple)), msg
+ for port in ports:
+ msg = 'Subnet specification %s is not a dict' % port
+ assert isinstance(port, dict), msg
+ err = set(port).difference((
+ 'network_id', 'status', 'name', 'admin_state_up',
+ 'mac_address', 'fixed_ips', 'security_groups'))
+ if err:
+ raise ValueError(
+ 'Invalid key(s): %s in port specification %s' % (
+ err, port))
+ msg = 'network_id is missing in port spec: %s' % port
+ assert port.get('network_id', None), msg
+ except AssertionError as ae:
+ raise ValueError('%s' % ae)
+ r = self.ports_post(json_data=dict(ports=list(ports)), success=201)
+ return r.json['ports']
+
+ def get_port_details(self, port_id):
+ r = self.ports_get(port_id, success=201)
+ return r.json['port']
+
+ def delete_port(self, port_id):
+ r = self.ports_delete(port_id, success=204)
+ return r.headers
+
+ def update_port(
+ self, port_id, network_id,
+ name=None, status=None, admin_state_up=None, mac_address=None,
+ fixed_ips=None, security_groups=None):
+ """
+ :param network_id: (str)
+
+ :param name: (str)
+ :param status: (str)
+ :param admin_state_up: (bool) Router administrative status (UP / DOWN)
+ :param mac_address: (str)
+ :param fixed_ips: (str)
+ :param security_groups: (list)
+ """
+ port = dict(network_id=network_id)
+ if name:
+ port['name'] = name
+ if status:
+ port['status'] = status
+ if admin_state_up not in (None, ):
+ port['admin_state_up'] = bool(admin_state_up)
+ if mac_address:
+ port['mac_address'] = mac_address
+ if fixed_ips:
+ port['fixed_ips'] = fixed_ips
+ if security_groups:
+ port['security_groups'] = security_groups
+ r = self.ports_put(port_id, json_data=dict(port=port), success=201)
+ return r.json['port']
--- /dev/null
+# Copyright 2013 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from kamaki.clients import Client
+from kamaki.clients.utils import path4url
+from json import dumps
+
+
+class NetworkRestClient(Client):
+
+ def networks_get(self, network_id=None, **kwargs):
+ if network_id:
+ return self.get(path4url('networks', network_id), **kwargs)
+ return self.get(path4url('networks'), **kwargs)
+
+ def networks_post(self, json_data, **kwargs):
+ return self.post(path4url('networks'), json=json_data, **kwargs)
+
+ def networks_put(self, network_id, json_data, **kwargs):
+ return self.put(
+ path4url('networks', network_id), json=json_data, **kwargs)
+
+ def networks_delete(self, network_id, **kwargs):
+ return self.delete(path4url('networks', network_id), **kwargs)
+
+ def subnets_get(self, subnet_id=None, **kwargs):
+ if subnet_id:
+ return self.get(path4url('subnets', subnet_id), **kwargs)
+ return self.get(path4url('subnets'), **kwargs)
+
+ def subnets_post(self, json_data, **kwargs):
+ return self.post(path4url('subnets'), json=json_data, **kwargs)
+
+ def subnets_put(self, subnet_id, **kwargs):
+ return self.put(path4url('subnets', subnet_id), **kwargs)
+
+ def subnets_delete(self, subnet_id, **kwargs):
+ return self.delete(path4url('subnets', subnet_id), **kwargs)
+
+ def ports_get(self, port_id=None, **kwargs):
+ if port_id:
+ return self.get(path4url('ports', port_id), **kwargs)
+ return self.get(path4url('ports'), **kwargs)
+
+ def ports_post(self, json_data=None, **kwargs):
+ return self.post(path4url('ports'), json=json_data, **kwargs)
+
+ def ports_put(self, port_id, json_data=None, **kwargs):
+ return self.put(path4url('ports', port_id), json=json_data, **kwargs)
+
+ def ports_delete(self, port_id, **kwargs):
+ return self.delete(path4url('ports', port_id), **kwargs)
--- /dev/null
+# Copyright 2013 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from mock import patch, call
+from unittest import TestCase
+from itertools import product
+from json import dumps
+
+from kamaki.clients import network
+
+
+class NetworkRestClient(TestCase):
+
+ """Set up a ComputesRest thorough test"""
+ def setUp(self):
+ self.url = 'http://network.example.com'
+ self.token = 'n2tw0rk70k3n'
+ self.client = network.NetworkRestClient(self.url, self.token)
+
+ def tearDown(self):
+ del self.client
+
+ def _assert(self, method_call, path, set_param=None, params=(), **kwargs):
+ """Assert the REST method call is called as expected"""
+ x0 = - len(params)
+ for i, (k, v, c) in enumerate(params):
+ self.assertEqual(set_param.mock_calls[x0 + i], call(k, v, iff=c))
+
+ self.assertEqual(method_call.mock_calls[-1], call(path, **kwargs))
+
+ @patch('kamaki.clients.Client.get', return_value='ret val')
+ def test_networks_get(self, get):
+ netid = 'netid'
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(self.client.networks_get(**kwargs), 'ret val')
+ self._assert(get, '/networks', **kwargs)
+ self.assertEqual(
+ self.client.networks_get(network_id=netid, **kwargs),
+ 'ret val')
+ self._assert(get, '/networks/%s' % netid, **kwargs)
+
+ @patch('kamaki.clients.Client.post', return_value='ret val')
+ def test_networks_post(self, post):
+ for kwargs in (
+ dict(json_data=dict(k1='v1')),
+ dict(json_data=dict(k2='v2'), k3='v3')):
+ self.assertEqual(self.client.networks_post(**kwargs), 'ret val')
+ json_data = kwargs.pop('json_data')
+ self._assert(post, '/networks', json=json_data, **kwargs)
+
+ @patch('kamaki.clients.Client.put', return_value='ret val')
+ def test_networks_put(self, put):
+ netid = 'netid'
+ for kwargs in (
+ dict(json_data=dict(k1='v1')),
+ dict(json_data=dict(k2='v2'), k3='v3')):
+ self.assertEqual(
+ self.client.networks_put(netid, **kwargs), 'ret val')
+ json_data = kwargs.pop('json_data')
+ self._assert(
+ put, '/networks/%s' % netid, json=json_data, **kwargs)
+
+ @patch('kamaki.clients.Client.delete', return_value='ret val')
+ def test_networks_delete(self, delete):
+ netid = 'netid'
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(
+ self.client.networks_delete(netid, **kwargs), 'ret val')
+ self._assert(delete, '/networks/%s' % netid, **kwargs)
+
+ @patch('kamaki.clients.Client.get', return_value='ret val')
+ def test_subnets_get(self, get):
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(self.client.subnets_get(**kwargs), 'ret val')
+ self._assert(get, '/subnets', **kwargs)
+
+ subnet_id = 'subnet id'
+ self.assertEqual(
+ self.client.subnets_get(subnet_id=subnet_id, **kwargs),
+ 'ret val')
+ self._assert(get, '/subnets/%s' % subnet_id, **kwargs)
+
+ @patch('kamaki.clients.Client.post', return_value='ret val')
+ def test_subnets_post(self, post):
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ json_data = dict(subnets='some data')
+ self.assertEqual(self.client.subnets_post(
+ json_data=json_data, **kwargs), 'ret val')
+ self._assert(post, '/subnets', json=json_data, **kwargs)
+
+ @patch('kamaki.clients.Client.put', return_value='ret val')
+ def test_subnets_put(self, put):
+ subnet_id = 'subid'
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(
+ self.client.subnets_put(subnet_id, **kwargs), 'ret val')
+ self._assert(put, '/subnets/%s' % subnet_id, **kwargs)
+
+ @patch('kamaki.clients.Client.delete', return_value='ret val')
+ def test_subnets_delete(self, delete):
+ netid = 'netid'
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(
+ self.client.subnets_delete(netid, **kwargs), 'ret val')
+ self._assert(delete, '/subnets/%s' % netid, **kwargs)
+
+ @patch('kamaki.clients.Client.get', return_value='ret val')
+ def test_ports_get(self, get):
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(self.client.ports_get(**kwargs), 'ret val')
+ self._assert(get, '/ports', **kwargs)
+
+ port_id = 'port id'
+ self.assertEqual(
+ self.client.ports_get(port_id=port_id, **kwargs),
+ 'ret val')
+ self._assert(get, '/ports/%s' % port_id, **kwargs)
+
+ @patch('kamaki.clients.Client.post', return_value='ret val')
+ def test_ports_post(self, post):
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(self.client.ports_post(**kwargs), 'ret val')
+ self._assert(post, '/ports', json=None, **kwargs)
+
+ json_data = dict(id='some id', other_param='other val')
+ self.assertEqual(
+ self.client.ports_post(json_data=json_data, **kwargs),
+ 'ret val')
+ self._assert(post, '/ports', json=json_data, **kwargs)
+
+ @patch('kamaki.clients.Client.put', return_value='ret val')
+ def test_ports_put(self, put):
+ port_id = 'portid'
+ for kwargs in (dict(), dict(k1='v1'), dict(k2='v2', k3='v3')):
+ self.assertEqual(
+ self.client.ports_put(port_id, **kwargs), 'ret val')
+ self._assert(put, '/ports/%s' % port_id, json=None, **kwargs)
+
+ json_data = dict(id='some id', other_param='other val')
+ self.assertEqual(
+ self.client.ports_put(port_id, json_data=json_data, **kwargs),
+ 'ret val')
+ self._assert(put, '/ports/%s' % port_id, json=json_data, **kwargs)
+
+
+class FakeObject(object):
+
+ json = None
+ headers = None
+
+
+class NetworkClient(TestCase):
+
+ """Set up a ComputesRest thorough test"""
+ def setUp(self):
+ self.url = 'http://network.example.com'
+ self.token = 'n2tw0rk70k3n'
+ self.client = network.NetworkClient(self.url, self.token)
+
+ def tearDown(self):
+ FakeObject.json, FakeObject.headers = None, None
+ del self.client
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.networks_get',
+ return_value=FakeObject())
+ def test_list_networks(self, networks_get):
+ FakeObject.json = dict(networks='ret val')
+ self.assertEqual(self.client.list_networks(), 'ret val')
+ networks_get.assert_called_once_with(success=200)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.networks_post',
+ return_value=FakeObject())
+ def test_create_network(self, networks_post):
+ for admin_state_up, shared in product((None, True), (None, True)):
+ FakeObject.json = dict(network='ret val')
+ name = 'net name'
+ self.assertEqual(
+ self.client.create_network(
+ name, admin_state_up=admin_state_up, shared=shared),
+ 'ret val')
+ req = dict(name=name, admin_state_up=bool(admin_state_up))
+ if shared:
+ req['shared'] = shared
+ expargs = dict(json_data=dict(network=req), success=201)
+ self.assertEqual(networks_post.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.networks_post',
+ return_value=FakeObject())
+ def test_create_networks(self, networks_post):
+ for networks in (
+ None, dict(name='name'), 'nets', [1, 2, 3], [{'k': 'v'}, ],
+ [dict(admin_state_up=True, shared=True)],
+ [dict(name='n1', invalid='mistake'), ],
+ [dict(name='valid', shared=True), {'err': 'nop'}]):
+ self.assertRaises(
+ ValueError, self.client.create_networks, networks)
+
+ FakeObject.json = dict(networks='ret val')
+ for networks in (
+ [
+ dict(name='net1'),
+ dict(name='net 2', admin_state_up=False, shared=True)],
+ [
+ dict(name='net1', admin_state_up=True),
+ dict(name='net 2', shared=False),
+ dict(name='net-3')],
+ (dict(name='n.e.t'), dict(name='net 2'))):
+ self.assertEqual(self.client.create_networks(networks), 'ret val')
+
+ networks = list(networks)
+ expargs = dict(json_data=dict(networks=networks), success=201)
+ self.assertEqual(networks_post.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.networks_get',
+ return_value=FakeObject())
+ def test_get_network_details(self, networks_get):
+ netid, FakeObject.json = 'netid', dict(network='ret val')
+ self.assertEqual(self.client.get_network_details(netid), 'ret val')
+ networks_get.assert_called_once_with(netid, success=200)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.networks_put',
+ return_value=FakeObject())
+ def test_update_network(self, networks_put):
+ netid, FakeObject.json = 'netid', dict(network='ret val')
+ for name, admin_state_up, shared in product(
+ ('net name', None), (True, None), (True, None)):
+ kwargs = dict(
+ name=name, admin_state_up=admin_state_up, shared=shared)
+ self.assertEqual(
+ self.client.update_network(netid, **kwargs), 'ret val')
+ if name in (None, ):
+ kwargs.pop('name')
+ if admin_state_up in (None, ):
+ kwargs.pop('admin_state_up')
+ if shared in (None, ):
+ kwargs.pop('shared')
+ kwargs = dict(json_data=dict(network=kwargs), success=200)
+ self.assertEqual(
+ networks_put.mock_calls[-1], call(netid, **kwargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.networks_delete',
+ return_value=FakeObject())
+ def test_delete_network(self, networks_delete):
+ netid, FakeObject.headers = 'netid', 'ret headers'
+ self.assertEqual(self.client.delete_network(netid), 'ret headers')
+ networks_delete.assert_called_once_with(netid, success=204)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.subnets_get',
+ return_value=FakeObject())
+ def test_list_subnets(self, subnets_get):
+ FakeObject.json = dict(subnets='ret val')
+ self.assertEqual(self.client.list_subnets(), 'ret val')
+ subnets_get.assert_called_once_with(success=200)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.subnets_post',
+ return_value=FakeObject())
+ def test_create_subnet(self, subnets_post):
+ for (
+ name, allocation_pools, gateway_ip,
+ subnet_id, ipv6, enable_dhcp) in product(
+ ('name', None), ('all pools', None), ('gip', None),
+ ('sid', None), (True, None), (True, None)):
+ kwargs = dict(
+ name=name, allocation_pools=allocation_pools,
+ gateway_ip=gateway_ip, subnet_id=subnet_id,
+ ipv6=ipv6, enable_dhcp=enable_dhcp)
+ FakeObject.json, network_id, cidr = dict(subnet='rv'), 'name', 'cd'
+ self.assertEqual(
+ self.client.create_subnet(network_id, cidr, **kwargs), 'rv')
+ req = dict(
+ network_id=network_id, cidr=cidr,
+ ip_version=6 if kwargs.pop('ipv6', None) else 4)
+ for k, v in kwargs.items():
+ if v:
+ req['id' if k == 'subnet_id' else k] = v
+ expargs = dict(json_data=dict(subnet=req), success=201)
+ self.assertEqual(subnets_post.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.subnets_post',
+ return_value=FakeObject())
+ def test_create_subnets(self, subnets_post):
+ for subnets in (
+ None, dict(network_id='name'), 'nets', [1, 2, 3], [{'k': 'v'}],
+ [dict(ipv6=True, enable_dhcp=True)],
+ [dict(network_id='n1', cidr='dr', invalid='mistake'), ],
+ [dict(network_id='valid', cidr='valid'), {'err': 'nop'}]):
+ self.assertRaises(
+ ValueError, self.client.create_subnets, subnets)
+
+ FakeObject.json = dict(subnets='ret val')
+ for subnets in (
+ [
+ dict(network_id='n1', cidr='c1'),
+ dict(network_id='n 2', cidr='c 2', name='name')],
+ [
+ dict(network_id='n1', cidr='c 6', allocation_pools='a p'),
+ dict(network_id='n 2', cidr='c_4', gateway_ip='g ip'),
+ dict(network_id='n 2', cidr='c_4', subnet_id='s id'),
+ dict(network_id='n-4', cidr='c3', ipv6=True, name='w. 6'),
+ dict(network_id='n_5', cidr='c2', enable_dhcp=True)],
+ (
+ dict(network_id='n.e.t', cidr='c-5'),
+ dict(network_id='net 2', cidr='c 2'))):
+ self.assertEqual(self.client.create_subnets(subnets), 'ret val')
+
+ for subnet in subnets:
+ subnet['ip_version'] = 6 if subnet.pop('ipv6', None) else 4
+ if 'subnet_id' in subnet:
+ subnet['id'] = subnet.pop('subnet_id')
+ subnets = list(subnets)
+ expargs = dict(json_data=dict(subnets=subnets), success=201)
+ self.assertEqual(subnets_post.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.subnets_get',
+ return_value=FakeObject())
+ def test_get_subnet_details(self, subnets_get):
+ subid, FakeObject.json = 'subid', 'ret val'
+ self.assertEqual(self.client.get_subnet_details(subid), 'ret val')
+ subnets_get.assert_called_once_with(subid, success=201)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.subnets_put',
+ return_value=FakeObject())
+ def test_update_subnet(self, subnets_put):
+ for (
+ name, allocation_pools, gateway_ip,
+ subnet_id, ipv6, enable_dhcp) in product(
+ ('name', None), ('all pools', None), ('gip', None),
+ ('sid', None), (True, False, None), (True, False, None)):
+ kwargs = dict(
+ name=name, allocation_pools=allocation_pools,
+ gateway_ip=gateway_ip, subnet_id=subnet_id,
+ ipv6=ipv6, enable_dhcp=enable_dhcp)
+ FakeObject.json, network_id, cidr = dict(subnet='rv'), 'name', 'cd'
+ self.assertEqual(
+ self.client.update_subnet(network_id, cidr, **kwargs), 'rv')
+ req = dict(network_id=network_id, cidr=cidr)
+ if kwargs.get('ipv6', None) not in (None, ):
+ req['ip_version'] = 6 if kwargs.pop('ipv6') else 4
+ for k, v in kwargs.items():
+ if v not in (None, ):
+ req['id' if k == 'subnet_id' else k] = v
+ expargs = dict(json_data=dict(subnet=req), success=201)
+ self.assertEqual(subnets_put.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.subnets_delete',
+ return_value=FakeObject())
+ def test_delete_subnet(self, subnets_delete):
+ netid, FakeObject.headers = 'netid', 'ret headers'
+ self.assertEqual(self.client.delete_subnet(netid), 'ret headers')
+ subnets_delete.assert_called_once_with(netid, success=204)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.ports_get',
+ return_value=FakeObject())
+ def test_list_ports(self, ports_get):
+ FakeObject.json = dict(ports='ret val')
+ self.assertEqual(self.client.list_ports(), 'ret val')
+ ports_get.assert_called_once_with(success=200)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.ports_post',
+ return_value=FakeObject())
+ def test_create_port(self, ports_post):
+ for (
+ name, status, admin_state_up,
+ mac_address, fixed_ips, security_groups
+ ) in product(
+ ('name', None), ('status', None), (True, False, None),
+ ('maddr', None), ('some ips', None), ([1, 2, 3], None)):
+ kwargs = dict(
+ name=name, status=status, admin_state_up=admin_state_up,
+ mac_address=mac_address, fixed_ips=fixed_ips,
+ security_groups=security_groups)
+ FakeObject.json, network_id = dict(port='ret val'), 'name'
+ self.assertEqual(
+ self.client.create_port(network_id, **kwargs), 'ret val')
+ req = dict(network_id=network_id)
+ for k, v in kwargs.items():
+ if v not in (None, ):
+ req[k] = v
+ expargs = dict(json_data=dict(port=req), success=201)
+ self.assertEqual(ports_post.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.ports_post',
+ return_value=FakeObject())
+ def test_create_ports(self, ports_post):
+ for ports in (
+ None, dict(network_id='name'), 'nets', [1, 2, 3], [{'k': 'v'}],
+ [dict(name=True, mac_address='mac')],
+ [dict(network_id='n1', invalid='mistake'), ],
+ [dict(network_id='valid', name='valid'), {'err': 'nop'}]):
+ self.assertRaises(
+ ValueError, self.client.create_ports, ports)
+
+ FakeObject.json = dict(ports='ret val')
+ for ports in (
+ [dict(network_id='n1'), dict(network_id='n 2', name='name')],
+ [
+ dict(network_id='n1', name='n 6', status='status'),
+ dict(network_id='n 2', admin_state_up=True, fixed_ips='f'),
+ dict(network_id='n 2', mac_address='mc', name='n.a.m.e.'),
+ dict(network_id='n-4', security_groups='s~G', name='w. 6'),
+ dict(network_id='n_5', admin_state_up=False, name='f a')],
+ (
+ dict(network_id='n.e.t', name='c-5'),
+ dict(network_id='net 2', status='YEAH'))):
+ self.assertEqual(self.client.create_ports(ports), 'ret val')
+ expargs = dict(json_data=dict(ports=list(ports)), success=201)
+ self.assertEqual(ports_post.mock_calls[-1], call(**expargs))
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.ports_get',
+ return_value=FakeObject())
+ def test_get_port_details(self, ports_get):
+ portid, FakeObject.json = 'portid', dict(ports='ret val')
+ self.assertEqual(self.client.get_port_details(portid), 'ret val')
+ ports_get.assert_called_once_with(portid, success=201)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.ports_delete',
+ return_value=FakeObject())
+ def test_delete_port(self, ports_delete):
+ portid, FakeObject.headers = 'portid', 'ret headers'
+ self.assertEqual(self.client.delete_port(portid), 'ret headers')
+ ports_delete.assert_called_once_with(portid, success=204)
+
+ @patch(
+ 'kamaki.clients.network.NetworkClient.ports_put',
+ return_value=FakeObject())
+ def test_update_port(self, ports_put):
+ for (
+ name, status, admin_state_up, mac_address, fixed_ips,
+ security_groups) in product(
+ ('name', None), ('st', None), (True, None), ('mc', None),
+ ('fps', None), ('sg', None)):
+ FakeObject.json = dict(port='rv')
+ port_id, network_id = 'pid', 'nid'
+ kwargs = dict(
+ network_id=network_id, name=name, status=status,
+ admin_state_up=admin_state_up, mac_address=mac_address,
+ fixed_ips=fixed_ips, security_groups=security_groups)
+ self.assertEqual(
+ self.client.update_port(port_id, **kwargs), 'rv')
+ req = dict()
+ for k, v in kwargs.items():
+ if v:
+ req[k] = v
+ expargs = dict(json_data=dict(port=req), success=201)
+ self.assertEqual(
+ ports_put.mock_calls[-1], call(port_id, **expargs))
+
+
+if __name__ == '__main__':
+ from sys import argv
+ from kamaki.clients.test import runTestCase
+ not_found = True
+ if not argv[1:] or argv[1] == 'NetworkClient':
+ not_found = False
+ runTestCase(NetworkClient, 'Network Client', argv[2:])
+ if not argv[1:] or argv[1] == 'NetworkRestClient':
+ not_found = False
+ runTestCase(NetworkRestClient, 'NetworkRest Client', argv[2:])
+ if not_found:
+ print('TestCase %s not found' % argv[1])
from kamaki.clients.utils.test import Utils
from kamaki.clients.astakos.test import AstakosClient
from kamaki.clients.compute.test import ComputeClient, ComputeRestClient
-from kamaki.clients.cyclades.test import CycladesClient
+from kamaki.clients.network.test import (NetworkClient, NetworkRestClient)
+from kamaki.clients.cyclades.test import CycladesClient, CycladesNetworkClient
from kamaki.clients.cyclades.test import CycladesRestClient
from kamaki.clients.image.test import ImageClient
from kamaki.clients.storage.test import StorageClient
'kamaki.clients.astakos',
'kamaki.clients.compute',
'kamaki.clients.cyclades',
+ 'kamaki.clients.network'
],
classifiers=[
'Operating System :: OS Independent',