Revision 0c3d9c7c lib/rpc.py
b/lib/rpc.py | ||
---|---|---|
126 | 126 |
return wrapper |
127 | 127 |
|
128 | 128 |
|
129 |
def _Compress(data): |
|
129 |
def _Compress(_, data):
|
|
130 | 130 |
"""Compresses a string for transport over RPC. |
131 | 131 |
|
132 | 132 |
Small amounts of data are not compressed. |
... | ... | |
460 | 460 |
self._encoder = compat.partial(self._EncodeArg, encoder_fn) |
461 | 461 |
|
462 | 462 |
@staticmethod |
463 |
def _EncodeArg(encoder_fn, (argkind, value)): |
|
463 |
def _EncodeArg(encoder_fn, node, (argkind, value)):
|
|
464 | 464 |
"""Encode argument. |
465 | 465 |
|
466 | 466 |
""" |
467 | 467 |
if argkind is None: |
468 | 468 |
return value |
469 | 469 |
else: |
470 |
return encoder_fn(argkind)(value) |
|
470 |
return encoder_fn(argkind)(node, value)
|
|
471 | 471 |
|
472 | 472 |
def _Call(self, cdef, node_list, args): |
473 | 473 |
"""Entry point for automatically generated RPC wrappers. |
... | ... | |
489 | 489 |
if len(args) != len(argdefs): |
490 | 490 |
raise errors.ProgrammerError("Number of passed arguments doesn't match") |
491 | 491 |
|
492 |
enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args)) |
|
493 | 492 |
if prep_fn is None: |
494 |
# for a no-op prep_fn, we serialise the body once, and then we |
|
495 |
# reuse it in the dictionary values |
|
496 |
body = serializer.DumpJson(enc_args) |
|
497 |
pnbody = dict((n, body) for n in node_list) |
|
498 |
else: |
|
499 |
# for a custom prep_fn, we pass the encoded arguments and the |
|
500 |
# node name to the prep_fn, and we serialise its return value |
|
501 |
assert callable(prep_fn) |
|
502 |
pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args))) |
|
503 |
for n in node_list) |
|
493 |
prep_fn = lambda _, args: args |
|
494 |
assert callable(prep_fn) |
|
495 |
|
|
496 |
# encode the arguments for each node individually, pass them and the node |
|
497 |
# name to the prep_fn, and serialise its return value |
|
498 |
encode_args_fn = lambda node: map(compat.partial(self._encoder, node), |
|
499 |
zip(map(compat.snd, argdefs), args)) |
|
500 |
pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n)))) |
|
501 |
for n in node_list) |
|
504 | 502 |
|
505 | 503 |
result = self._proc(node_list, procedure, pnbody, read_timeout, |
506 | 504 |
req_resolver_opts) |
... | ... | |
512 | 510 |
return result |
513 | 511 |
|
514 | 512 |
|
515 |
def _ObjectToDict(value): |
|
513 |
def _ObjectToDict(_, value):
|
|
516 | 514 |
"""Converts an object to a dictionary. |
517 | 515 |
|
518 | 516 |
@note: See L{objects}. |
... | ... | |
521 | 519 |
return value.ToDict() |
522 | 520 |
|
523 | 521 |
|
524 |
def _ObjectListToDict(value): |
|
522 |
def _ObjectListToDict(node, value):
|
|
525 | 523 |
"""Converts a list of L{objects} to dictionaries. |
526 | 524 |
|
527 | 525 |
""" |
528 |
return map(_ObjectToDict, value)
|
|
526 |
return map(compat.partial(_ObjectToDict, node), value)
|
|
529 | 527 |
|
530 | 528 |
|
531 |
def _EncodeNodeToDiskDict(value): |
|
532 |
"""Encodes a dictionary with node name as key and disk objects as values. |
|
533 |
|
|
534 |
""" |
|
535 |
return dict((name, _ObjectListToDict(disks)) |
|
536 |
for name, disks in value.items()) |
|
537 |
|
|
538 |
|
|
539 |
def _PrepareFileUpload(getents_fn, filename): |
|
529 |
def _PrepareFileUpload(getents_fn, node, filename): |
|
540 | 530 |
"""Loads a file and prepares it for an upload to nodes. |
541 | 531 |
|
542 | 532 |
""" |
543 | 533 |
statcb = utils.FileStatHelper() |
544 |
data = _Compress(utils.ReadFile(filename, preread=statcb)) |
|
534 |
data = _Compress(node, utils.ReadFile(filename, preread=statcb))
|
|
545 | 535 |
st = statcb.st |
546 | 536 |
|
547 | 537 |
if getents_fn is None: |
... | ... | |
555 | 545 |
getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] |
556 | 546 |
|
557 | 547 |
|
558 |
def _PrepareFinalizeExportDisks(snap_disks): |
|
548 |
def _PrepareFinalizeExportDisks(_, snap_disks):
|
|
559 | 549 |
"""Encodes disks for finalizing export. |
560 | 550 |
|
561 | 551 |
""" |
... | ... | |
570 | 560 |
return flat_disks |
571 | 561 |
|
572 | 562 |
|
573 |
def _EncodeImportExportIO((ieio, ieioargs)): |
|
574 |
"""Encodes import/export I/O information. |
|
575 |
|
|
576 |
""" |
|
577 |
if ieio == constants.IEIO_RAW_DISK: |
|
578 |
assert len(ieioargs) == 1 |
|
579 |
return (ieio, (ieioargs[0].ToDict(), )) |
|
580 |
|
|
581 |
if ieio == constants.IEIO_SCRIPT: |
|
582 |
assert len(ieioargs) == 2 |
|
583 |
return (ieio, (ieioargs[0].ToDict(), ieioargs[1])) |
|
584 |
|
|
585 |
return (ieio, ieioargs) |
|
586 |
|
|
587 |
|
|
588 |
def _EncodeBlockdevRename(value): |
|
563 |
def _EncodeBlockdevRename(_, value): |
|
589 | 564 |
"""Encodes information for renaming block devices. |
590 | 565 |
|
591 | 566 |
""" |
... | ... | |
683 | 658 |
return disk |
684 | 659 |
|
685 | 660 |
|
686 |
def AnnotateDiskParams(template, disks, disk_params):
|
|
661 |
def AnnotateDiskParams(disks, disk_params): |
|
687 | 662 |
"""Annotates the disk objects with the disk parameters. |
688 | 663 |
|
689 |
@param template: The disk template used |
|
690 | 664 |
@param disks: The list of disks objects to annotate |
691 |
@param disk_params: The disk paramaters for annotation
|
|
665 |
@param disk_params: The disk parameters for annotation
|
|
692 | 666 |
@returns: A list of disk objects annotated |
693 | 667 |
|
694 | 668 |
""" |
695 |
ld_params = objects.Disk.ComputeLDParams(template, disk_params) |
|
669 |
def AnnotateDisk(disk): |
|
670 |
if disk.dev_type == constants.DT_DISKLESS: |
|
671 |
return disk |
|
696 | 672 |
|
697 |
if template == constants.DT_DRBD8: |
|
698 |
annotation_fn = _AnnotateDParamsDRBD |
|
699 |
elif template == constants.DT_DISKLESS: |
|
700 |
annotation_fn = lambda disk, _: disk |
|
701 |
else: |
|
702 |
annotation_fn = _AnnotateDParamsGeneric |
|
673 |
ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params) |
|
703 | 674 |
|
704 |
return [annotation_fn(disk.Copy(), ld_params) for disk in disks] |
|
675 |
if disk.dev_type == constants.DT_DRBD8: |
|
676 |
return _AnnotateDParamsDRBD(disk, ld_params) |
|
677 |
else: |
|
678 |
return _AnnotateDParamsGeneric(disk, ld_params) |
|
679 |
|
|
680 |
return [AnnotateDisk(disk.Copy()) for disk in disks] |
|
705 | 681 |
|
706 | 682 |
|
707 | 683 |
def _GetExclusiveStorageFlag(cfg, node_uuid): |
... | ... | |
785 | 761 |
_ENCODERS = { |
786 | 762 |
rpc_defs.ED_OBJECT_DICT: _ObjectToDict, |
787 | 763 |
rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, |
788 |
rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict, |
|
789 | 764 |
rpc_defs.ED_COMPRESS: _Compress, |
790 | 765 |
rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, |
791 |
rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO, |
|
792 | 766 |
rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, |
793 | 767 |
} |
794 | 768 |
|
... | ... | |
825 | 799 |
rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, |
826 | 800 |
rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP, |
827 | 801 |
rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, |
802 |
rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP, |
|
828 | 803 |
|
829 | 804 |
# Encoders with special requirements |
830 | 805 |
rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), |
806 |
|
|
807 |
rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO, |
|
831 | 808 |
}) |
832 | 809 |
|
833 | 810 |
# Resolver using configuration |
... | ... | |
846 | 823 |
_generated_rpc.RpcClientDnsOnly.__init__(self) |
847 | 824 |
_generated_rpc.RpcClientDefault.__init__(self) |
848 | 825 |
|
849 |
def _NicDict(self, nic): |
|
826 |
def _NicDict(self, _, nic):
|
|
850 | 827 |
"""Convert the given nic to a dict and encapsulate netinfo |
851 | 828 |
|
852 | 829 |
""" |
... | ... | |
858 | 835 |
n.netinfo = objects.Network.ToDict(nobj) |
859 | 836 |
return n.ToDict() |
860 | 837 |
|
861 |
def _InstDict(self, instance, hvp=None, bep=None, osp=None): |
|
838 |
def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
|
|
862 | 839 |
"""Convert the given instance to a dict. |
863 | 840 |
|
864 | 841 |
This is done via the instance's ToDict() method and additionally |
... | ... | |
888 | 865 |
idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) |
889 | 866 |
if osp is not None: |
890 | 867 |
idict["osparams"].update(osp) |
891 |
idict["disks"] = self._DisksDictDP((instance.disks, instance)) |
|
868 |
idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
|
|
892 | 869 |
for nic in idict["nics"]: |
893 | 870 |
nic["nicparams"] = objects.FillDict( |
894 | 871 |
cluster.nicparams[constants.PP_DEFAULT], |
... | ... | |
901 | 878 |
nic["netinfo"] = objects.Network.ToDict(nobj) |
902 | 879 |
return idict |
903 | 880 |
|
904 |
def _InstDictHvpBepDp(self, (instance, hvp, bep)): |
|
881 |
def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
|
|
905 | 882 |
"""Wrapper for L{_InstDict}. |
906 | 883 |
|
907 | 884 |
""" |
908 |
return self._InstDict(instance, hvp=hvp, bep=bep) |
|
885 |
return self._InstDict(node, instance, hvp=hvp, bep=bep)
|
|
909 | 886 |
|
910 |
def _InstDictOspDp(self, (instance, osparams)): |
|
887 |
def _InstDictOspDp(self, node, (instance, osparams)):
|
|
911 | 888 |
"""Wrapper for L{_InstDict}. |
912 | 889 |
|
913 | 890 |
""" |
914 |
return self._InstDict(instance, osp=osparams) |
|
891 |
return self._InstDict(node, instance, osp=osparams)
|
|
915 | 892 |
|
916 |
def _DisksDictDP(self, (disks, instance)): |
|
893 |
def _DisksDictDP(self, node, (disks, instance)):
|
|
917 | 894 |
"""Wrapper for L{AnnotateDiskParams}. |
918 | 895 |
|
919 | 896 |
""" |
920 | 897 |
diskparams = self._cfg.GetInstanceDiskParams(instance) |
921 |
return [disk.ToDict() |
|
922 |
for disk in AnnotateDiskParams(instance.disk_template, |
|
923 |
disks, diskparams)] |
|
898 |
ret = [] |
|
899 |
for disk in AnnotateDiskParams(disks, diskparams): |
|
900 |
disk_node_uuids = disk.GetNodes(instance.primary_node) |
|
901 |
node_ips = dict((uuid, node.secondary_ip) for (uuid, node) |
|
902 |
in self._cfg.GetMultiNodeInfo(disk_node_uuids)) |
|
903 |
|
|
904 |
disk.UpdateDynamicDiskParams(node, node_ips) |
|
905 |
|
|
906 |
ret.append(disk.ToDict()) |
|
924 | 907 |
|
925 |
def _MultiDiskDictDP(self, disks_insts): |
|
908 |
return ret |
|
909 |
|
|
910 |
def _MultiDiskDictDP(self, node, disks_insts): |
|
926 | 911 |
"""Wrapper for L{AnnotateDiskParams}. |
927 | 912 |
|
928 | 913 |
Supports a list of (disk, instance) tuples. |
929 | 914 |
""" |
930 | 915 |
return [disk for disk_inst in disks_insts |
931 |
for disk in self._DisksDictDP(disk_inst)] |
|
916 |
for disk in self._DisksDictDP(node, disk_inst)]
|
|
932 | 917 |
|
933 |
def _SingleDiskDictDP(self, (disk, instance)): |
|
918 |
def _SingleDiskDictDP(self, node, (disk, instance)):
|
|
934 | 919 |
"""Wrapper for L{AnnotateDiskParams}. |
935 | 920 |
|
936 | 921 |
""" |
937 |
(anno_disk,) = self._DisksDictDP(([disk], instance)) |
|
922 |
(anno_disk,) = self._DisksDictDP(node, ([disk], instance))
|
|
938 | 923 |
return anno_disk |
939 | 924 |
|
925 |
def _EncodeNodeToDiskDictDP(self, node, value): |
|
926 |
"""Encode dict of node name -> list of (disk, instance) tuples as values. |
|
927 |
|
|
928 |
""" |
|
929 |
return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks]) |
|
930 |
for name, disks in value.items()) |
|
931 |
|
|
932 |
def _EncodeImportExportIO(self, node, (ieio, ieioargs)): |
|
933 |
"""Encodes import/export I/O information. |
|
934 |
|
|
935 |
""" |
|
936 |
if ieio == constants.IEIO_RAW_DISK: |
|
937 |
assert len(ieioargs) == 1 |
|
938 |
return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), )) |
|
939 |
|
|
940 |
if ieio == constants.IEIO_SCRIPT: |
|
941 |
assert len(ieioargs) == 2 |
|
942 |
return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1])) |
|
943 |
|
|
944 |
return (ieio, ieioargs) |
|
945 |
|
|
940 | 946 |
|
941 | 947 |
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue): |
942 | 948 |
"""RPC wrappers for job queue. |
Also available in: Unified diff