@errors.generic.all
@errors.cyclades.connection
def _run(self, subnet_id):
- r = self.client.get_subnet_details(subnet_id)
- r = self.client.update_subnet(
- subnet_id, r['network_id'], name=self['new_name'])
+ r = self.client.update_subnet(subnet_id, name=self['new_name'])
self._print(r, self.print_dict)
def main(self, subnet_id):
return r.json
def update_subnet(
- self, network_id, cidr,
- name=None, allocation_pools=None, gateway_ip=None, subnet_id=None,
- ipv6=None, enable_dhcp=None):
+ self, subnet_id,
+ name=None, allocation_pools=None, gateway_ip=None, ipv6=None,
+ enable_dhcp=None):
"""
- :param network_id: (str) used as filter
- :param cidr: (str) used as filter
+ :param subnet_id: (str)
:param name: (str) The subnet name
:param allocation_pools: (list of dicts) start/end addresses of
allocation pools: [{'start': ..., 'end': ...}, ...]
:param gateway_ip: (str)
- :param subnet_id: (str)
:param ipv6: (bool) ip_version == 6 if true, 4 if false, used as filter
:param enable_dhcp: (bool)
"""
- subnet = dict(network_id=network_id, cidr=cidr)
+ subnet = dict()
if name not in (None, ):
subnet['name'] = name
if allocation_pools not in (None, ):
subnet['allocation_pools'] = allocation_pools
if gateway_ip not in (None, ):
subnet['gateway_ip'] = gateway_ip
- if subnet_id not in (None, ):
- subnet['id'] = subnet_id
if ipv6 not in (None, ):
subnet['ip_version'] = 6 if ipv6 else 4
if enable_dhcp not in (None, ):
subnet['enable_dhcp'] = enable_dhcp
- r = self.subnets_put(json_data=dict(subnet=subnet), success=201)
+ r = self.subnets_put(
+ subnet_id, json=dict(subnet=subnet), success=200)
return r.json['subnet']
def delete_subnet(self, subnet_id):
def test_update_subnet(self, subnets_put):
for (
name, allocation_pools, gateway_ip,
- subnet_id, ipv6, enable_dhcp) in product(
+ ipv6, enable_dhcp) in product(
('name', None), ('all pools', None), ('gip', None),
- ('sid', None), (True, False, None), (True, False, None)):
+ (True, False, None), (True, False, None)):
kwargs = dict(
name=name, allocation_pools=allocation_pools,
- gateway_ip=gateway_ip, subnet_id=subnet_id,
- ipv6=ipv6, enable_dhcp=enable_dhcp)
- FakeObject.json, network_id, cidr = dict(subnet='rv'), 'name', 'cd'
+ gateway_ip=gateway_ip, ipv6=ipv6, enable_dhcp=enable_dhcp)
+ FakeObject.json, subnet_id = dict(subnet='rv'), 'sid'
self.assertEqual(
- self.client.update_subnet(network_id, cidr, **kwargs), 'rv')
- req = dict(network_id=network_id, cidr=cidr)
- if kwargs.get('ipv6', None) not in (None, ):
- req['ip_version'] = 6 if kwargs.pop('ipv6') else 4
+ self.client.update_subnet(subnet_id, **kwargs), 'rv')
+ req = dict()
for k, v in kwargs.items():
if v not in (None, ):
- req['id' if k == 'subnet_id' else k] = v
- expargs = dict(json_data=dict(subnet=req), success=201)
- self.assertEqual(subnets_put.mock_calls[-1], call(**expargs))
+ if k in ('ipv6', ):
+ req['ip_version'] = 6 if v else 4
+ else:
+ req[k] = v
+ expargs = dict(json=dict(subnet=req), success=200)
+ self.assertEqual(
+ subnets_put.mock_calls[-1], call(subnet_id, **expargs))
@patch(
'kamaki.clients.network.NetworkClient.subnets_delete',
continue
self.client.request(method, path, **kwargs)
self.assertEqual(
- RespInit.mock_calls[-1], call(FR, connection_retry_limit=0))
+ RespInit.mock_calls[-1],
+ call(FR, connection_retry_limit=0, poolsize=None))
@patch('kamaki.clients.Client.request', return_value='lala')
def _test_foo(self, foo, request):