Revision cd40dc53 lib/rpc.py
b/lib/rpc.py | ||
---|---|---|
47 | 47 |
from ganeti import ssconf |
48 | 48 |
from ganeti import runtime |
49 | 49 |
from ganeti import compat |
50 |
from ganeti import rpc_defs |
|
50 | 51 |
|
51 | 52 |
# Special module generated at build time |
52 | 53 |
from ganeti import _generated_rpc |
... | ... | |
409 | 410 |
return self._CombineResults(results, requests, procedure) |
410 | 411 |
|
411 | 412 |
|
412 |
class RpcRunner(_generated_rpc.RpcClientDefault, |
|
413 |
class _RpcClientBase: |
|
414 |
def __init__(self, resolver, encoder_fn, lock_monitor_cb=None): |
|
415 |
"""Initializes this class. |
|
416 |
|
|
417 |
""" |
|
418 |
self._proc = _RpcProcessor(resolver, |
|
419 |
netutils.GetDaemonPort(constants.NODED), |
|
420 |
lock_monitor_cb=lock_monitor_cb) |
|
421 |
self._encoder = compat.partial(self._EncodeArg, encoder_fn) |
|
422 |
|
|
423 |
@staticmethod |
|
424 |
def _EncodeArg(encoder_fn, (argkind, value)): |
|
425 |
"""Encode argument. |
|
426 |
|
|
427 |
""" |
|
428 |
if argkind is None: |
|
429 |
return value |
|
430 |
else: |
|
431 |
return encoder_fn(argkind)(value) |
|
432 |
|
|
433 |
def _Call(self, node_list, procedure, timeout, argdefs, args): |
|
434 |
"""Entry point for automatically generated RPC wrappers. |
|
435 |
|
|
436 |
""" |
|
437 |
assert len(args) == len(argdefs), "Wrong number of arguments" |
|
438 |
|
|
439 |
body = serializer.DumpJson(map(self._encoder, zip(argdefs, args)), |
|
440 |
indent=False) |
|
441 |
|
|
442 |
return self._proc(node_list, procedure, body, read_timeout=timeout) |
|
443 |
|
|
444 |
|
|
445 |
def _ObjectToDict(value): |
|
446 |
"""Converts an object to a dictionary. |
|
447 |
|
|
448 |
@note: See L{objects}. |
|
449 |
|
|
450 |
""" |
|
451 |
return value.ToDict() |
|
452 |
|
|
453 |
|
|
454 |
def _ObjectListToDict(value): |
|
455 |
"""Converts a list of L{objects} to dictionaries. |
|
456 |
|
|
457 |
""" |
|
458 |
return map(_ObjectToDict, value) |
|
459 |
|
|
460 |
|
|
461 |
def _EncodeNodeToDiskDict(value): |
|
462 |
"""Encodes a dictionary with node name as key and disk objects as values. |
|
463 |
|
|
464 |
""" |
|
465 |
return dict((name, _ObjectListToDict(disks)) |
|
466 |
for name, disks in value.items()) |
|
467 |
|
|
468 |
|
|
469 |
def _PrepareFileUpload(filename): |
|
470 |
"""Loads a file and prepares it for an upload to nodes. |
|
471 |
|
|
472 |
""" |
|
473 |
data = _Compress(utils.ReadFile(filename)) |
|
474 |
st = os.stat(filename) |
|
475 |
getents = runtime.GetEnts() |
|
476 |
return [filename, data, st.st_mode, getents.LookupUid(st.st_uid), |
|
477 |
getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] |
|
478 |
|
|
479 |
|
|
480 |
def _PrepareFinalizeExportDisks(snap_disks): |
|
481 |
"""Encodes disks for finalizing export. |
|
482 |
|
|
483 |
""" |
|
484 |
flat_disks = [] |
|
485 |
|
|
486 |
for disk in snap_disks: |
|
487 |
if isinstance(disk, bool): |
|
488 |
flat_disks.append(disk) |
|
489 |
else: |
|
490 |
flat_disks.append(disk.ToDict()) |
|
491 |
|
|
492 |
return flat_disks |
|
493 |
|
|
494 |
|
|
495 |
def _EncodeImportExportIO((ieio, ieioargs)): |
|
496 |
"""Encodes import/export I/O information. |
|
497 |
|
|
498 |
""" |
|
499 |
if ieio == constants.IEIO_RAW_DISK: |
|
500 |
assert len(ieioargs) == 1 |
|
501 |
return (ieio, (ieioargs[0].ToDict(), )) |
|
502 |
|
|
503 |
if ieio == constants.IEIO_SCRIPT: |
|
504 |
assert len(ieioargs) == 2 |
|
505 |
return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) |
|
506 |
|
|
507 |
return (ieio, ieioargs) |
|
508 |
|
|
509 |
|
|
510 |
def _EncodeBlockdevRename(value): |
|
511 |
"""Encodes information for renaming block devices. |
|
512 |
|
|
513 |
""" |
|
514 |
return [(d.ToDict(), uid) for d, uid in value] |
|
515 |
|
|
516 |
|
|
517 |
#: Generic encoders |
|
518 |
_ENCODERS = { |
|
519 |
rpc_defs.ED_OBJECT_DICT: _ObjectToDict, |
|
520 |
rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, |
|
521 |
rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict, |
|
522 |
rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload, |
|
523 |
rpc_defs.ED_COMPRESS: _Compress, |
|
524 |
rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, |
|
525 |
rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO, |
|
526 |
rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, |
|
527 |
} |
|
528 |
|
|
529 |
|
|
530 |
class RpcRunner(_RpcClientBase, |
|
531 |
_generated_rpc.RpcClientDefault, |
|
413 | 532 |
_generated_rpc.RpcClientBootstrap, |
414 | 533 |
_generated_rpc.RpcClientConfig): |
415 | 534 |
"""RPC runner class. |
... | ... | |
422 | 541 |
@param context: Ganeti context |
423 | 542 |
|
424 | 543 |
""" |
544 |
self._cfg = context.cfg |
|
545 |
|
|
546 |
encoders = _ENCODERS.copy() |
|
547 |
|
|
548 |
# Add encoders requiring configuration object |
|
549 |
encoders.update({ |
|
550 |
rpc_defs.ED_INST_DICT: self._InstDict, |
|
551 |
rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep, |
|
552 |
rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp, |
|
553 |
}) |
|
554 |
|
|
555 |
# Resolver using configuration |
|
556 |
resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo, |
|
557 |
self._cfg.GetAllNodesInfo) |
|
558 |
|
|
425 | 559 |
# Pylint doesn't recognize multiple inheritance properly, see |
426 | 560 |
# <http://www.logilab.org/ticket/36586> and |
427 | 561 |
# <http://www.logilab.org/ticket/35642> |
428 | 562 |
# pylint: disable=W0233 |
563 |
_RpcClientBase.__init__(self, resolver, encoders.get, |
|
564 |
lock_monitor_cb=context.glm.AddToLockMonitor) |
|
429 | 565 |
_generated_rpc.RpcClientConfig.__init__(self) |
430 | 566 |
_generated_rpc.RpcClientBootstrap.__init__(self) |
431 | 567 |
_generated_rpc.RpcClientDefault.__init__(self) |
432 | 568 |
|
433 |
self._cfg = context.cfg |
|
434 |
self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver, |
|
435 |
self._cfg.GetNodeInfo, |
|
436 |
self._cfg.GetAllNodesInfo), |
|
437 |
netutils.GetDaemonPort(constants.NODED), |
|
438 |
lock_monitor_cb=context.glm.AddToLockMonitor) |
|
439 |
|
|
440 | 569 |
def _InstDict(self, instance, hvp=None, bep=None, osp=None): |
441 | 570 |
"""Convert the given instance to a dict. |
442 | 571 |
|
... | ... | |
485 | 614 |
""" |
486 | 615 |
return self._InstDict(instance, osp=osparams) |
487 | 616 |
|
488 |
def _Call(self, node_list, procedure, timeout, args): |
|
489 |
"""Entry point for automatically generated RPC wrappers. |
|
490 |
|
|
491 |
""" |
|
492 |
body = serializer.DumpJson(args, indent=False) |
|
493 |
|
|
494 |
return self._proc(node_list, procedure, body, read_timeout=timeout) |
|
495 |
|
|
496 | 617 |
@staticmethod |
497 | 618 |
def _MigrationStatusPostProc(result): |
498 | 619 |
if not result.fail_msg and result.payload is not None: |
... | ... | |
531 | 652 |
return result |
532 | 653 |
|
533 | 654 |
@staticmethod |
534 |
def _PrepareFinalizeExportDisks(snap_disks): |
|
535 |
flat_disks = [] |
|
536 |
|
|
537 |
for disk in snap_disks: |
|
538 |
if isinstance(disk, bool): |
|
539 |
flat_disks.append(disk) |
|
540 |
else: |
|
541 |
flat_disks.append(disk.ToDict()) |
|
542 |
|
|
543 |
return flat_disks |
|
544 |
|
|
545 |
@staticmethod |
|
546 | 655 |
def _ImpExpStatusPostProc(result): |
547 | 656 |
"""Post-processor for import/export status. |
548 | 657 |
|
... | ... | |
564 | 673 |
|
565 | 674 |
return result |
566 | 675 |
|
567 |
@staticmethod |
|
568 |
def _EncodeImportExportIO((ieio, ieioargs)): |
|
569 |
"""Encodes import/export I/O information. |
|
570 |
|
|
571 |
""" |
|
572 |
if ieio == constants.IEIO_RAW_DISK: |
|
573 |
assert len(ieioargs) == 1 |
|
574 |
return (ieio, (ieioargs[0].ToDict(), )) |
|
575 |
|
|
576 |
if ieio == constants.IEIO_SCRIPT: |
|
577 |
assert len(ieioargs) == 2 |
|
578 |
return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) |
|
579 |
|
|
580 |
return (ieio, ieioargs) |
|
581 |
|
|
582 |
@staticmethod |
|
583 |
def _PrepareFileUpload(filename): |
|
584 |
"""Loads a file and prepares it for an upload to nodes. |
|
585 |
|
|
586 |
""" |
|
587 |
data = _Compress(utils.ReadFile(filename)) |
|
588 |
st = os.stat(filename) |
|
589 |
getents = runtime.GetEnts() |
|
590 |
return [filename, data, st.st_mode, getents.LookupUid(st.st_uid), |
|
591 |
getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] |
|
592 |
|
|
593 | 676 |
# |
594 | 677 |
# Begin RPC calls |
595 | 678 |
# |
... | ... | |
605 | 688 |
read_timeout=int(duration + 5)) |
606 | 689 |
|
607 | 690 |
|
608 |
class JobQueueRunner(_generated_rpc.RpcClientJobQueue): |
|
691 |
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
|
|
609 | 692 |
"""RPC wrappers for job queue. |
610 | 693 |
|
611 | 694 |
""" |
612 |
_Compress = staticmethod(_Compress) |
|
613 |
|
|
614 | 695 |
def __init__(self, context, address_list): |
615 | 696 |
"""Initializes this class. |
616 | 697 |
|
617 | 698 |
""" |
618 |
_generated_rpc.RpcClientJobQueue.__init__(self) |
|
619 |
|
|
620 | 699 |
if address_list is None: |
621 | 700 |
resolver = _SsconfResolver |
622 | 701 |
else: |
623 | 702 |
# Caller provided an address list |
624 | 703 |
resolver = _StaticResolver(address_list) |
625 | 704 |
|
626 |
self._proc = _RpcProcessor(resolver, |
|
627 |
netutils.GetDaemonPort(constants.NODED), |
|
628 |
lock_monitor_cb=context.glm.AddToLockMonitor) |
|
629 |
|
|
630 |
def _Call(self, node_list, procedure, timeout, args): |
|
631 |
"""Entry point for automatically generated RPC wrappers. |
|
632 |
|
|
633 |
""" |
|
634 |
body = serializer.DumpJson(args, indent=False) |
|
635 |
|
|
636 |
return self._proc(node_list, procedure, body, read_timeout=timeout) |
|
705 |
_RpcClientBase.__init__(self, resolver, _ENCODERS.get, |
|
706 |
lock_monitor_cb=context.glm.AddToLockMonitor) |
|
707 |
_generated_rpc.RpcClientJobQueue.__init__(self) |
|
637 | 708 |
|
638 | 709 |
|
639 |
class BootstrapRunner(_generated_rpc.RpcClientBootstrap): |
|
710 |
class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
|
|
640 | 711 |
"""RPC wrappers for bootstrapping. |
641 | 712 |
|
642 | 713 |
""" |
... | ... | |
644 | 715 |
"""Initializes this class. |
645 | 716 |
|
646 | 717 |
""" |
718 |
_RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get) |
|
647 | 719 |
_generated_rpc.RpcClientBootstrap.__init__(self) |
648 | 720 |
|
649 |
self._proc = _RpcProcessor(_SsconfResolver, |
|
650 |
netutils.GetDaemonPort(constants.NODED)) |
|
651 |
|
|
652 |
def _Call(self, node_list, procedure, timeout, args): |
|
653 |
"""Entry point for automatically generated RPC wrappers. |
|
654 |
|
|
655 |
""" |
|
656 |
body = serializer.DumpJson(args, indent=False) |
|
657 |
|
|
658 |
return self._proc(node_list, procedure, body, read_timeout=timeout) |
|
659 |
|
|
660 | 721 |
|
661 |
class ConfigRunner(_generated_rpc.RpcClientConfig): |
|
722 |
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
|
|
662 | 723 |
"""RPC wrappers for L{config}. |
663 | 724 |
|
664 | 725 |
""" |
665 |
_PrepareFileUpload = \ |
|
666 |
staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212 |
|
667 |
|
|
668 | 726 |
def __init__(self, address_list): |
669 | 727 |
"""Initializes this class. |
670 | 728 |
|
671 | 729 |
""" |
672 |
_generated_rpc.RpcClientConfig.__init__(self) |
|
673 |
|
|
674 | 730 |
if address_list is None: |
675 | 731 |
resolver = _SsconfResolver |
676 | 732 |
else: |
677 | 733 |
# Caller provided an address list |
678 | 734 |
resolver = _StaticResolver(address_list) |
679 | 735 |
|
680 |
self._proc = _RpcProcessor(resolver, |
|
681 |
netutils.GetDaemonPort(constants.NODED)) |
|
682 |
|
|
683 |
def _Call(self, node_list, procedure, timeout, args): |
|
684 |
"""Entry point for automatically generated RPC wrappers. |
|
685 |
|
|
686 |
""" |
|
687 |
body = serializer.DumpJson(args, indent=False) |
|
688 |
|
|
689 |
return self._proc(node_list, procedure, body, read_timeout=timeout) |
|
736 |
_RpcClientBase.__init__(self, resolver, _ENCODERS.get) |
|
737 |
_generated_rpc.RpcClientConfig.__init__(self) |
Also available in: Unified diff