Revision 1c3231aa lib/rpc.py
b/lib/rpc.py | ||
---|---|---|
275 | 275 |
ip = ipmap.get(node) |
276 | 276 |
if ip is None: |
277 | 277 |
ip = nslookup_fn(node, family=family) |
278 |
result.append((node, ip)) |
|
278 |
result.append((node, ip, node))
|
|
279 | 279 |
|
280 | 280 |
return result |
281 | 281 |
|
... | ... | |
292 | 292 |
|
293 | 293 |
""" |
294 | 294 |
assert len(hosts) == len(self._addresses) |
295 |
return zip(hosts, self._addresses) |
|
295 |
return zip(hosts, self._addresses, hosts)
|
|
296 | 296 |
|
297 | 297 |
|
298 |
def _CheckConfigNode(name, node, accept_offline_node): |
|
298 |
def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
|
|
299 | 299 |
"""Checks if a node is online. |
300 | 300 |
|
301 |
@type name: string |
|
302 |
@param name: Node name
|
|
301 |
@type node_uuid_or_name: string
|
|
302 |
@param node_uuid_or_name: Node UUID
|
|
303 | 303 |
@type node: L{objects.Node} or None |
304 | 304 |
@param node: Node object |
305 | 305 |
|
306 | 306 |
""" |
307 | 307 |
if node is None: |
308 |
# Depend on DNS for name resolution |
|
309 |
ip = name |
|
310 |
elif node.offline and not accept_offline_node: |
|
311 |
ip = _OFFLINE |
|
308 |
# Assume that the passed parameter was actually a node name, so depend on |
|
309 |
# DNS for name resolution |
|
310 |
return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name) |
|
312 | 311 |
else: |
313 |
ip = node.primary_ip |
|
314 |
return (name, ip) |
|
312 |
if node.offline and not accept_offline_node: |
|
313 |
ip = _OFFLINE |
|
314 |
else: |
|
315 |
ip = node.primary_ip |
|
316 |
return (node.name, ip, node_uuid_or_name) |
|
315 | 317 |
|
316 | 318 |
|
317 |
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
|
|
319 |
def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
|
|
318 | 320 |
"""Calculate node addresses using configuration. |
319 | 321 |
|
322 |
Note that strings in node_uuids are treated as node names if the UUID is not |
|
323 |
found in the configuration. |
|
324 |
|
|
320 | 325 |
""" |
321 | 326 |
accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) |
322 | 327 |
|
323 | 328 |
assert accept_offline_node or opts is None, "Unknown option" |
324 | 329 |
|
325 | 330 |
# Special case for single-host lookups |
326 |
if len(hosts) == 1:
|
|
327 |
(name, ) = hosts
|
|
328 |
return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
|
|
331 |
if len(node_uuids) == 1:
|
|
332 |
(uuid, ) = node_uuids
|
|
333 |
return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
|
|
329 | 334 |
else: |
330 | 335 |
all_nodes = all_nodes_fn() |
331 |
return [_CheckConfigNode(name, all_nodes.get(name, None),
|
|
336 |
return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
|
|
332 | 337 |
accept_offline_node) |
333 |
for name in hosts]
|
|
338 |
for uuid in node_uuids]
|
|
334 | 339 |
|
335 | 340 |
|
336 | 341 |
class _RpcProcessor: |
337 | 342 |
def __init__(self, resolver, port, lock_monitor_cb=None): |
338 | 343 |
"""Initializes this class. |
339 | 344 |
|
340 |
@param resolver: callable accepting a list of hostnames, returning a list |
|
341 |
of tuples containing name and IP address (IP address can be the name or |
|
342 |
the special value L{_OFFLINE} to mark offline machines) |
|
345 |
@param resolver: callable accepting a list of node UUIDs or hostnames, |
|
346 |
returning a list of tuples containing name, IP address and original name |
|
347 |
of the resolved node. IP address can be the name or the special value |
|
348 |
L{_OFFLINE} to mark offline machines. |
|
343 | 349 |
@type port: int |
344 | 350 |
@param port: TCP port |
345 | 351 |
@param lock_monitor_cb: Callable for registering with lock monitor |
... | ... | |
363 | 369 |
assert isinstance(body, dict) |
364 | 370 |
assert len(body) == len(hosts) |
365 | 371 |
assert compat.all(isinstance(v, str) for v in body.values()) |
366 |
assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
|
|
372 |
assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
|
|
367 | 373 |
"%s != %s" % (hosts, body.keys()) |
368 | 374 |
|
369 |
for (name, ip) in hosts: |
|
375 |
for (name, ip, original_name) in hosts:
|
|
370 | 376 |
if ip is _OFFLINE: |
371 | 377 |
# Node is marked as offline |
372 |
results[name] = RpcResult(node=name, offline=True, call=procedure) |
|
378 |
results[original_name] = RpcResult(node=name, |
|
379 |
offline=True, |
|
380 |
call=procedure) |
|
373 | 381 |
else: |
374 |
requests[name] = \ |
|
382 |
requests[original_name] = \
|
|
375 | 383 |
http.client.HttpClientRequest(str(ip), port, |
376 | 384 |
http.HTTP_POST, str("/%s" % procedure), |
377 | 385 |
headers=_RPC_CLIENT_HEADERS, |
378 |
post_data=body[name], |
|
386 |
post_data=body[original_name],
|
|
379 | 387 |
read_timeout=read_timeout, |
380 | 388 |
nicename="%s/%s" % (name, procedure), |
381 | 389 |
curl_config_fn=_ConfigRpcCurl) |
... | ... | |
406 | 414 |
|
407 | 415 |
return results |
408 | 416 |
|
409 |
def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
|
|
417 |
def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
|
|
410 | 418 |
_req_process_fn=None): |
411 | 419 |
"""Makes an RPC request to a number of nodes. |
412 | 420 |
|
413 |
@type hosts: sequence
|
|
414 |
@param hosts: Hostnames
|
|
421 |
@type nodes: sequence
|
|
422 |
@param nodes: node UUIDs or Hostnames
|
|
415 | 423 |
@type procedure: string |
416 | 424 |
@param procedure: Request path |
417 | 425 |
@type body: dictionary |
... | ... | |
429 | 437 |
_req_process_fn = http.client.ProcessRequests |
430 | 438 |
|
431 | 439 |
(results, requests) = \ |
432 |
self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
|
|
440 |
self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
|
|
433 | 441 |
procedure, body, read_timeout) |
434 | 442 |
|
435 | 443 |
_req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) |
... | ... | |
675 | 683 |
return [annotation_fn(disk.Copy(), ld_params) for disk in disks] |
676 | 684 |
|
677 | 685 |
|
678 |
def _GetESFlag(cfg, nodename):
|
|
679 |
ni = cfg.GetNodeInfo(nodename)
|
|
686 |
def _GetESFlag(cfg, node_uuid):
|
|
687 |
ni = cfg.GetNodeInfo(node_uuid)
|
|
680 | 688 |
if ni is None: |
681 |
raise errors.OpPrereqError("Invalid node name %s" % nodename,
|
|
689 |
raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
|
|
682 | 690 |
errors.ECODE_NOENT) |
683 | 691 |
return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE] |
684 | 692 |
|
685 | 693 |
|
686 |
def GetExclusiveStorageForNodeNames(cfg, nodelist):
|
|
694 |
def GetExclusiveStorageForNodes(cfg, node_uuids):
|
|
687 | 695 |
"""Return the exclusive storage flag for all the given nodes. |
688 | 696 |
|
689 | 697 |
@type cfg: L{config.ConfigWriter} |
690 | 698 |
@param cfg: cluster configuration |
691 |
@type nodelist: list or tuple
|
|
692 |
@param nodelist: node names for which to read the flag
|
|
699 |
@type node_uuids: list or tuple
|
|
700 |
@param node_uuids: node UUIDs for which to read the flag
|
|
693 | 701 |
@rtype: dict |
694 | 702 |
@return: mapping from node names to exclusive storage flags |
695 |
@raise errors.OpPrereqError: if any given node name has no corresponding node |
|
703 |
@raise errors.OpPrereqError: if any given node name has no corresponding |
|
704 |
node |
|
696 | 705 |
|
697 | 706 |
""" |
698 | 707 |
getflag = lambda n: _GetESFlag(cfg, n) |
699 |
flags = map(getflag, nodelist)
|
|
700 |
return dict(zip(nodelist, flags))
|
|
708 |
flags = map(getflag, node_uuids)
|
|
709 |
return dict(zip(node_uuids, flags))
|
|
701 | 710 |
|
702 | 711 |
|
703 | 712 |
#: Generic encoders |
Also available in: Unified diff