Revision 0c3d9c7c lib/rpc.py

b/lib/rpc.py
126 126
  return wrapper
127 127

  
128 128

  
129
def _Compress(data):
129
def _Compress(_, data):
130 130
  """Compresses a string for transport over RPC.
131 131

  
132 132
  Small amounts of data are not compressed.
......
460 460
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
461 461

  
462 462
  @staticmethod
463
  def _EncodeArg(encoder_fn, (argkind, value)):
463
  def _EncodeArg(encoder_fn, node, (argkind, value)):
464 464
    """Encode argument.
465 465

  
466 466
    """
467 467
    if argkind is None:
468 468
      return value
469 469
    else:
470
      return encoder_fn(argkind)(value)
470
      return encoder_fn(argkind)(node, value)
471 471

  
472 472
  def _Call(self, cdef, node_list, args):
473 473
    """Entry point for automatically generated RPC wrappers.
......
489 489
    if len(args) != len(argdefs):
490 490
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
491 491

  
492
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
493 492
    if prep_fn is None:
494
      # for a no-op prep_fn, we serialise the body once, and then we
495
      # reuse it in the dictionary values
496
      body = serializer.DumpJson(enc_args)
497
      pnbody = dict((n, body) for n in node_list)
498
    else:
499
      # for a custom prep_fn, we pass the encoded arguments and the
500
      # node name to the prep_fn, and we serialise its return value
501
      assert callable(prep_fn)
502
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
503
                    for n in node_list)
493
      prep_fn = lambda _, args: args
494
    assert callable(prep_fn)
495

  
496
    # encode the arguments for each node individually, pass them and the node
497
    # name to the prep_fn, and serialise its return value
498
    encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
499
                                      zip(map(compat.snd, argdefs), args))
500
    pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
501
                  for n in node_list)
504 502

  
505 503
    result = self._proc(node_list, procedure, pnbody, read_timeout,
506 504
                        req_resolver_opts)
......
512 510
      return result
513 511

  
514 512

  
515
def _ObjectToDict(value):
513
def _ObjectToDict(_, value):
516 514
  """Converts an object to a dictionary.
517 515

  
518 516
  @note: See L{objects}.
......
521 519
  return value.ToDict()
522 520

  
523 521

  
524
def _ObjectListToDict(value):
522
def _ObjectListToDict(node, value):
525 523
  """Converts a list of L{objects} to dictionaries.
526 524

  
527 525
  """
528
  return map(_ObjectToDict, value)
526
  return map(compat.partial(_ObjectToDict, node), value)
529 527

  
530 528

  
531
def _EncodeNodeToDiskDict(value):
532
  """Encodes a dictionary with node name as key and disk objects as values.
533

  
534
  """
535
  return dict((name, _ObjectListToDict(disks))
536
              for name, disks in value.items())
537

  
538

  
539
def _PrepareFileUpload(getents_fn, filename):
529
def _PrepareFileUpload(getents_fn, node, filename):
540 530
  """Loads a file and prepares it for an upload to nodes.
541 531

  
542 532
  """
543 533
  statcb = utils.FileStatHelper()
544
  data = _Compress(utils.ReadFile(filename, preread=statcb))
534
  data = _Compress(node, utils.ReadFile(filename, preread=statcb))
545 535
  st = statcb.st
546 536

  
547 537
  if getents_fn is None:
......
555 545
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
556 546

  
557 547

  
558
def _PrepareFinalizeExportDisks(snap_disks):
548
def _PrepareFinalizeExportDisks(_, snap_disks):
559 549
  """Encodes disks for finalizing export.
560 550

  
561 551
  """
......
570 560
  return flat_disks
571 561

  
572 562

  
573
def _EncodeImportExportIO((ieio, ieioargs)):
574
  """Encodes import/export I/O information.
575

  
576
  """
577
  if ieio == constants.IEIO_RAW_DISK:
578
    assert len(ieioargs) == 1
579
    return (ieio, (ieioargs[0].ToDict(), ))
580

  
581
  if ieio == constants.IEIO_SCRIPT:
582
    assert len(ieioargs) == 2
583
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
584

  
585
  return (ieio, ieioargs)
586

  
587

  
588
def _EncodeBlockdevRename(value):
563
def _EncodeBlockdevRename(_, value):
589 564
  """Encodes information for renaming block devices.
590 565

  
591 566
  """
......
683 658
  return disk
684 659

  
685 660

  
686
def AnnotateDiskParams(template, disks, disk_params):
661
def AnnotateDiskParams(disks, disk_params):
687 662
  """Annotates the disk objects with the disk parameters.
688 663

  
689
  @param template: The disk template used
690 664
  @param disks: The list of disks objects to annotate
691
  @param disk_params: The disk paramaters for annotation
665
  @param disk_params: The disk parameters for annotation
692 666
  @returns: A list of disk objects annotated
693 667

  
694 668
  """
695
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
669
  def AnnotateDisk(disk):
670
    if disk.dev_type == constants.DT_DISKLESS:
671
      return disk
696 672

  
697
  if template == constants.DT_DRBD8:
698
    annotation_fn = _AnnotateDParamsDRBD
699
  elif template == constants.DT_DISKLESS:
700
    annotation_fn = lambda disk, _: disk
701
  else:
702
    annotation_fn = _AnnotateDParamsGeneric
673
    ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
703 674

  
704
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
675
    if disk.dev_type == constants.DT_DRBD8:
676
      return _AnnotateDParamsDRBD(disk, ld_params)
677
    else:
678
      return _AnnotateDParamsGeneric(disk, ld_params)
679

  
680
  return [AnnotateDisk(disk.Copy()) for disk in disks]
705 681

  
706 682

  
707 683
def _GetExclusiveStorageFlag(cfg, node_uuid):
......
785 761
_ENCODERS = {
786 762
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
787 763
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
788
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
789 764
  rpc_defs.ED_COMPRESS: _Compress,
790 765
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
791
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
792 766
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
793 767
  }
794 768

  
......
825 799
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
826 800
      rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
827 801
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
802
      rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
828 803

  
829 804
      # Encoders with special requirements
830 805
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
806

  
807
      rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
831 808
      })
832 809

  
833 810
    # Resolver using configuration
......
846 823
    _generated_rpc.RpcClientDnsOnly.__init__(self)
847 824
    _generated_rpc.RpcClientDefault.__init__(self)
848 825

  
849
  def _NicDict(self, nic):
826
  def _NicDict(self, _, nic):
850 827
    """Convert the given nic to a dict and encapsulate netinfo
851 828

  
852 829
    """
......
858 835
        n.netinfo = objects.Network.ToDict(nobj)
859 836
    return n.ToDict()
860 837

  
861
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
838
  def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
862 839
    """Convert the given instance to a dict.
863 840

  
864 841
    This is done via the instance's ToDict() method and additionally
......
888 865
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
889 866
    if osp is not None:
890 867
      idict["osparams"].update(osp)
891
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
868
    idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
892 869
    for nic in idict["nics"]:
893 870
      nic["nicparams"] = objects.FillDict(
894 871
        cluster.nicparams[constants.PP_DEFAULT],
......
901 878
          nic["netinfo"] = objects.Network.ToDict(nobj)
902 879
    return idict
903 880

  
904
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
881
  def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
905 882
    """Wrapper for L{_InstDict}.
906 883

  
907 884
    """
908
    return self._InstDict(instance, hvp=hvp, bep=bep)
885
    return self._InstDict(node, instance, hvp=hvp, bep=bep)
909 886

  
910
  def _InstDictOspDp(self, (instance, osparams)):
887
  def _InstDictOspDp(self, node, (instance, osparams)):
911 888
    """Wrapper for L{_InstDict}.
912 889

  
913 890
    """
914
    return self._InstDict(instance, osp=osparams)
891
    return self._InstDict(node, instance, osp=osparams)
915 892

  
916
  def _DisksDictDP(self, (disks, instance)):
893
  def _DisksDictDP(self, node, (disks, instance)):
917 894
    """Wrapper for L{AnnotateDiskParams}.
918 895

  
919 896
    """
920 897
    diskparams = self._cfg.GetInstanceDiskParams(instance)
921
    return [disk.ToDict()
922
            for disk in AnnotateDiskParams(instance.disk_template,
923
                                           disks, diskparams)]
898
    ret = []
899
    for disk in AnnotateDiskParams(disks, diskparams):
900
      disk_node_uuids = disk.GetNodes(instance.primary_node)
901
      node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
902
                      in self._cfg.GetMultiNodeInfo(disk_node_uuids))
903

  
904
      disk.UpdateDynamicDiskParams(node, node_ips)
905

  
906
      ret.append(disk.ToDict())
924 907

  
925
  def _MultiDiskDictDP(self, disks_insts):
908
    return ret
909

  
910
  def _MultiDiskDictDP(self, node, disks_insts):
926 911
    """Wrapper for L{AnnotateDiskParams}.
927 912

  
928 913
    Supports a list of (disk, instance) tuples.
929 914
    """
930 915
    return [disk for disk_inst in disks_insts
931
            for disk in self._DisksDictDP(disk_inst)]
916
            for disk in self._DisksDictDP(node, disk_inst)]
932 917

  
933
  def _SingleDiskDictDP(self, (disk, instance)):
918
  def _SingleDiskDictDP(self, node, (disk, instance)):
934 919
    """Wrapper for L{AnnotateDiskParams}.
935 920

  
936 921
    """
937
    (anno_disk,) = self._DisksDictDP(([disk], instance))
922
    (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
938 923
    return anno_disk
939 924

  
925
  def _EncodeNodeToDiskDictDP(self, node, value):
926
    """Encode dict of node name -> list of (disk, instance) tuples as values.
927

  
928
    """
929
    return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
930
                for name, disks in value.items())
931

  
932
  def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
933
    """Encodes import/export I/O information.
934

  
935
    """
936
    if ieio == constants.IEIO_RAW_DISK:
937
      assert len(ieioargs) == 1
938
      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ))
939

  
940
    if ieio == constants.IEIO_SCRIPT:
941
      assert len(ieioargs) == 2
942
      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
943

  
944
    return (ieio, ieioargs)
945

  
940 946

  
941 947
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
942 948
  """RPC wrappers for job queue.

Also available in: Unified diff