Revision 72737a7f lib/cmdlib.py

b/lib/cmdlib.py
32 32
import platform
33 33
import logging
34 34

  
35
from ganeti import rpc
36 35
from ganeti import ssh
37 36
from ganeti import logger
38 37
from ganeti import utils
......
67 66
  REQ_MASTER = True
68 67
  REQ_BGL = True
69 68

  
70
  def __init__(self, processor, op, context):
69
  def __init__(self, processor, op, context, rpc):
71 70
    """Constructor for LogicalUnit.
72 71

  
73 72
    This needs to be overriden in derived classes in order to check op
......
78 77
    self.op = op
79 78
    self.cfg = context.cfg
80 79
    self.context = context
80
    self.rpc = rpc
81 81
    # Dicts used to declare locking needs to mcpu
82 82
    self.needed_locks = None
83 83
    self.acquired_locks = {}
......
448 448
  """
449 449
  # check bridges existance
450 450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
451
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
452 452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453 453
                               " exist on destination node '%s'" %
454 454
                               (brlist, instance.primary_node))
......
484 484

  
485 485
    """
486 486
    master = self.cfg.GetMasterNode()
487
    if not rpc.call_node_stop_master(master, False):
487
    if not self.rpc.call_node_stop_master(master, False):
488 488
      raise errors.OpExecError("Could not disable the master role")
489 489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490 490
    utils.CreateBackup(priv_key)
......
742 742
    local_checksums = utils.FingerprintFiles(file_names)
743 743

  
744 744
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
745
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
746
    all_instanceinfo = rpc.call_instance_list(nodelist, hypervisors)
747
    all_vglist = rpc.call_vg_list(nodelist)
745
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
746
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
747
    all_vglist = self.rpc.call_vg_list(nodelist)
748 748
    node_verify_param = {
749 749
      'filelist': file_names,
750 750
      'nodelist': nodelist,
......
752 752
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
753 753
                        for node in nodeinfo]
754 754
      }
755
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param,
756
                                      self.cfg.GetClusterName())
757
    all_rversion = rpc.call_version(nodelist)
758
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759
                                   self.cfg.GetHypervisorType())
755
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
756
                                           self.cfg.GetClusterName())
757
    all_rversion = self.rpc.call_version(nodelist)
758
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759
                                        self.cfg.GetHypervisorType())
760 760

  
761 761
    for node in nodelist:
762 762
      feedback_fn("* Verifying node %s" % node)
......
970 970
    if not nv_dict:
971 971
      return result
972 972

  
973
    node_lvs = rpc.call_volume_list(nodes, vg_name)
973
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
974 974

  
975 975
    to_act = set()
976 976
    for node in nodes:
......
1051 1051

  
1052 1052
    # shutdown the master IP
1053 1053
    master = self.cfg.GetMasterNode()
1054
    if not rpc.call_node_stop_master(master, False):
1054
    if not self.rpc.call_node_stop_master(master, False):
1055 1055
      raise errors.OpExecError("Could not disable the master role")
1056 1056

  
1057 1057
    try:
......
1069 1069
      logger.Debug("Copying updated ssconf data to all nodes")
1070 1070
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1071 1071
        fname = ss.KeyToFilename(keyname)
1072
        result = rpc.call_upload_file(dist_nodes, fname)
1072
        result = self.rpc.call_upload_file(dist_nodes, fname)
1073 1073
        for to_node in dist_nodes:
1074 1074
          if not result[to_node]:
1075 1075
            logger.Error("copy of file %s to node %s failed" %
1076 1076
                         (fname, to_node))
1077 1077
    finally:
1078
      if not rpc.call_node_start_master(master, False):
1078
      if not self.rpc.call_node_start_master(master, False):
1079 1079
        logger.Error("Could not re-enable the master role on the master,"
1080 1080
                     " please restart manually.")
1081 1081

  
......
1145 1145
    # if vg_name not None, checks given volume group on all nodes
1146 1146
    if self.op.vg_name:
1147 1147
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1148
      vglist = rpc.call_vg_list(node_list)
1148
      vglist = self.rpc.call_vg_list(node_list)
1149 1149
      for node in node_list:
1150 1150
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1151 1151
                                              constants.MIN_VG_SIZE)
......
1184 1184
    max_time = 0
1185 1185
    done = True
1186 1186
    cumul_degraded = False
1187
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1187
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1188 1188
    if not rstats:
1189 1189
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1190 1190
      retries += 1
......
1238 1238

  
1239 1239
  result = True
1240 1240
  if on_primary or dev.AssembleOnSecondary():
1241
    rstats = rpc.call_blockdev_find(node, dev)
1241
    rstats = lu.rpc.call_blockdev_find(node, dev)
1242 1242
    if not rstats:
1243 1243
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1244 1244
      result = False
......
1313 1313

  
1314 1314
    """
1315 1315
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1316
    node_data = rpc.call_os_diagnose(node_list)
1316
    node_data = self.rpc.call_os_diagnose(node_list)
1317 1317
    if node_data == False:
1318 1318
      raise errors.OpExecError("Can't gather the list of OSes")
1319 1319
    pol = self._DiagnoseByOS(node_list, node_data)
......
1403 1403

  
1404 1404
    self.context.RemoveNode(node.name)
1405 1405

  
1406
    rpc.call_node_leave_cluster(node.name)
1406
    self.rpc.call_node_leave_cluster(node.name)
1407 1407

  
1408 1408

  
1409 1409
class LUQueryNodes(NoHooksLU):
......
1475 1475

  
1476 1476
    if self.dynamic_fields.intersection(self.op.output_fields):
1477 1477
      live_data = {}
1478
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479
                                     self.cfg.GetHypervisorType())
1478
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479
                                          self.cfg.GetHypervisorType())
1480 1480
      for name in nodenames:
1481 1481
        nodeinfo = node_data.get(name, None)
1482 1482
        if nodeinfo:
......
1577 1577

  
1578 1578
    """
1579 1579
    nodenames = self.nodes
1580
    volumes = rpc.call_node_volumes(nodenames)
1580
    volumes = self.rpc.call_node_volumes(nodenames)
1581 1581

  
1582 1582
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1583 1583
             in self.cfg.GetInstanceList()]
......
1732 1732
    node = new_node.name
1733 1733

  
1734 1734
    # check connectivity
1735
    result = rpc.call_version([node])[node]
1735
    result = self.rpc.call_version([node])[node]
1736 1736
    if result:
1737 1737
      if constants.PROTOCOL_VERSION == result:
1738 1738
        logger.Info("communication to node %s fine, sw version %s match" %
......
1759 1759
      finally:
1760 1760
        f.close()
1761 1761

  
1762
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1763
                               keyarray[3], keyarray[4], keyarray[5])
1762
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1763
                                    keyarray[2],
1764
                                    keyarray[3], keyarray[4], keyarray[5])
1764 1765

  
1765 1766
    if not result:
1766 1767
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
......
1769 1770
    utils.AddHostToEtcHosts(new_node.name)
1770 1771

  
1771 1772
    if new_node.secondary_ip != new_node.primary_ip:
1772
      if not rpc.call_node_tcp_ping(new_node.name,
1773
                                    constants.LOCALHOST_IP_ADDRESS,
1774
                                    new_node.secondary_ip,
1775
                                    constants.DEFAULT_NODED_PORT,
1776
                                    10, False):
1773
      if not self.rpc.call_node_tcp_ping(new_node.name,
1774
                                         constants.LOCALHOST_IP_ADDRESS,
1775
                                         new_node.secondary_ip,
1776
                                         constants.DEFAULT_NODED_PORT,
1777
                                         10, False):
1777 1778
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1778 1779
                                 " you gave (%s). Please fix and re-run this"
1779 1780
                                 " command." % new_node.secondary_ip)
......
1784 1785
      # TODO: do a node-net-test as well?
1785 1786
    }
1786 1787

  
1787
    result = rpc.call_node_verify(node_verify_list, node_verify_param,
1788
                                  self.cfg.GetClusterName())
1788
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1789
                                       self.cfg.GetClusterName())
1789 1790
    for verifier in node_verify_list:
1790 1791
      if not result[verifier]:
1791 1792
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
......
1807 1808

  
1808 1809
    logger.Debug("Copying hosts and known_hosts to all nodes")
1809 1810
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1810
      result = rpc.call_upload_file(dist_nodes, fname)
1811
      result = self.rpc.call_upload_file(dist_nodes, fname)
1811 1812
      for to_node in dist_nodes:
1812 1813
        if not result[to_node]:
1813 1814
          logger.Error("copy of file %s to node %s failed" %
......
1817 1818
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1818 1819
      to_copy.append(constants.VNC_PASSWORD_FILE)
1819 1820
    for fname in to_copy:
1820
      result = rpc.call_upload_file([node], fname)
1821
      result = self.rpc.call_upload_file([node], fname)
1821 1822
      if not result[node]:
1822 1823
        logger.Error("could not copy file %s to node %s" % (fname, node))
1823 1824

  
......
1968 1969
  for inst_disk in instance.disks:
1969 1970
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1970 1971
      lu.cfg.SetDiskID(node_disk, node)
1971
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1972
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1972 1973
      if not result:
1973 1974
        logger.Error("could not prepare block device %s on node %s"
1974 1975
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
......
1983 1984
      if node != instance.primary_node:
1984 1985
        continue
1985 1986
      lu.cfg.SetDiskID(node_disk, node)
1986
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1987
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1987 1988
      if not result:
1988 1989
        logger.Error("could not prepare block device %s on node %s"
1989 1990
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
......
2054 2055
  _ShutdownInstanceDisks.
2055 2056

  
2056 2057
  """
2057
  ins_l = rpc.call_instance_list([instance.primary_node],
2058
                                 [instance.hypervisor])
2058
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2059
                                      [instance.hypervisor])
2059 2060
  ins_l = ins_l[instance.primary_node]
2060 2061
  if not type(ins_l) is list:
2061 2062
    raise errors.OpExecError("Can't contact node '%s'" %
......
2081 2082
  for disk in instance.disks:
2082 2083
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2083 2084
      lu.cfg.SetDiskID(top_disk, node)
2084
      if not rpc.call_blockdev_shutdown(node, top_disk):
2085
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2085 2086
        logger.Error("could not shutdown block device %s on node %s" %
2086 2087
                     (disk.iv_name, node))
2087 2088
        if not ignore_primary or node != instance.primary_node:
......
2111 2112
      we cannot check the node
2112 2113

  
2113 2114
  """
2114
  nodeinfo = rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2115
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2115 2116
  if not nodeinfo or not isinstance(nodeinfo, dict):
2116 2117
    raise errors.OpPrereqError("Could not contact node %s for resource"
2117 2118
                             " information" % (node,))
......
2189 2190

  
2190 2191
    _StartInstanceDisks(self, instance, force)
2191 2192

  
2192
    if not rpc.call_instance_start(node_current, instance, extra_args):
2193
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2193 2194
      _ShutdownInstanceDisks(self, instance)
2194 2195
      raise errors.OpExecError("Could not start instance")
2195 2196

  
......
2260 2261

  
2261 2262
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2262 2263
                       constants.INSTANCE_REBOOT_HARD]:
2263
      if not rpc.call_instance_reboot(node_current, instance,
2264
                                      reboot_type, extra_args):
2264
      if not self.rpc.call_instance_reboot(node_current, instance,
2265
                                           reboot_type, extra_args):
2265 2266
        raise errors.OpExecError("Could not reboot instance")
2266 2267
    else:
2267
      if not rpc.call_instance_shutdown(node_current, instance):
2268
      if not self.rpc.call_instance_shutdown(node_current, instance):
2268 2269
        raise errors.OpExecError("could not shutdown instance for full reboot")
2269 2270
      _ShutdownInstanceDisks(self, instance)
2270 2271
      _StartInstanceDisks(self, instance, ignore_secondaries)
2271
      if not rpc.call_instance_start(node_current, instance, extra_args):
2272
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2272 2273
        _ShutdownInstanceDisks(self, instance)
2273 2274
        raise errors.OpExecError("Could not start instance for full reboot")
2274 2275

  
......
2321 2322
    instance = self.instance
2322 2323
    node_current = instance.primary_node
2323 2324
    self.cfg.MarkInstanceDown(instance.name)
2324
    if not rpc.call_instance_shutdown(node_current, instance):
2325
    if not self.rpc.call_instance_shutdown(node_current, instance):
2325 2326
      logger.Error("could not shutdown instance")
2326 2327

  
2327 2328
    _ShutdownInstanceDisks(self, instance)
......
2372 2373
    if instance.status != "down":
2373 2374
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2374 2375
                                 self.op.instance_name)
2375
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2376
                                         instance.hypervisor)
2376
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2377
                                              instance.name,
2378
                                              instance.hypervisor)
2377 2379
    if remote_info:
2378 2380
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2379 2381
                                 (self.op.instance_name,
......
2387 2389
      if pnode is None:
2388 2390
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2389 2391
                                   self.op.pnode)
2390
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2392
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2391 2393
      if not os_obj:
2392 2394
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2393 2395
                                   " primary node"  % self.op.os_type)
......
2408 2410
    _StartInstanceDisks(self, inst, None)
2409 2411
    try:
2410 2412
      feedback_fn("Running the instance OS create scripts...")
2411
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2413
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2414
                                           "sda", "sdb"):
2412 2415
        raise errors.OpExecError("Could not install OS for instance %s"
2413 2416
                                 " on node %s" %
2414 2417
                                 (inst.name, inst.primary_node))
......
2450 2453
    if instance.status != "down":
2451 2454
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2452 2455
                                 self.op.instance_name)
2453
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2454
                                         instance.hypervisor)
2456
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2457
                                              instance.name,
2458
                                              instance.hypervisor)
2455 2459
    if remote_info:
2456 2460
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2457 2461
                                 (self.op.instance_name,
......
2493 2497

  
2494 2498
    if inst.disk_template == constants.DT_FILE:
2495 2499
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2496
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2497
                                                old_file_storage_dir,
2498
                                                new_file_storage_dir)
2500
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2501
                                                     old_file_storage_dir,
2502
                                                     new_file_storage_dir)
2499 2503

  
2500 2504
      if not result:
2501 2505
        raise errors.OpExecError("Could not connect to node '%s' to rename"
......
2512 2516

  
2513 2517
    _StartInstanceDisks(self, inst, None)
2514 2518
    try:
2515
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2516
                                          "sda", "sdb"):
2519
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2520
                                               old_name,
2521
                                               "sda", "sdb"):
2517 2522
        msg = ("Could not run OS rename script for instance %s on node %s"
2518 2523
               " (but the instance has been renamed in Ganeti)" %
2519 2524
               (inst.name, inst.primary_node))
......
2568 2573
    logger.Info("shutting down instance %s on node %s" %
2569 2574
                (instance.name, instance.primary_node))
2570 2575

  
2571
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2576
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2572 2577
      if self.op.ignore_failures:
2573 2578
        feedback_fn("Warning: can't shutdown instance")
2574 2579
      else:
......
2664 2669
    bad_nodes = []
2665 2670
    if self.dynamic_fields.intersection(self.op.output_fields):
2666 2671
      live_data = {}
2667
      node_data = rpc.call_all_instances_info(nodes, hv_list)
2672
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2668 2673
      for name in nodes:
2669 2674
        result = node_data[name]
2670 2675
        if result:
......
2820 2825

  
2821 2826
    # check bridge existance
2822 2827
    brlist = [nic.bridge for nic in instance.nics]
2823
    if not rpc.call_bridges_exist(target_node, brlist):
2828
    if not self.rpc.call_bridges_exist(target_node, brlist):
2824 2829
      raise errors.OpPrereqError("One or more target bridges %s does not"
2825 2830
                                 " exist on destination node '%s'" %
2826 2831
                                 (brlist, target_node))
......
2849 2854
    logger.Info("Shutting down instance %s on node %s" %
2850 2855
                (instance.name, source_node))
2851 2856

  
2852
    if not rpc.call_instance_shutdown(source_node, instance):
2857
    if not self.rpc.call_instance_shutdown(source_node, instance):
2853 2858
      if self.op.ignore_consistency:
2854 2859
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2855 2860
                     " anyway. Please make sure node %s is down"  %
......
2879 2884
        raise errors.OpExecError("Can't activate the instance's disks")
2880 2885

  
2881 2886
      feedback_fn("* starting the instance on the target node")
2882
      if not rpc.call_instance_start(target_node, instance, None):
2887
      if not self.rpc.call_instance_start(target_node, instance, None):
2883 2888
        _ShutdownInstanceDisks(self, instance)
2884 2889
        raise errors.OpExecError("Could not start instance %s on node %s." %
2885 2890
                                 (instance.name, target_node))
......
2897 2902
        return False
2898 2903

  
2899 2904
  lu.cfg.SetDiskID(device, node)
2900
  new_id = rpc.call_blockdev_create(node, device, device.size,
2901
                                    instance.name, True, info)
2905
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2906
                                       instance.name, True, info)
2902 2907
  if not new_id:
2903 2908
    return False
2904 2909
  if device.physical_id is None:
......
2926 2931
  if not force:
2927 2932
    return True
2928 2933
  lu.cfg.SetDiskID(device, node)
2929
  new_id = rpc.call_blockdev_create(node, device, device.size,
2930
                                    instance.name, False, info)
2934
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2935
                                       instance.name, False, info)
2931 2936
  if not new_id:
2932 2937
    return False
2933 2938
  if device.physical_id is None:
......
3049 3054

  
3050 3055
  if instance.disk_template == constants.DT_FILE:
3051 3056
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3052
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3053
                                              file_storage_dir)
3057
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3058
                                                 file_storage_dir)
3054 3059

  
3055 3060
    if not result:
3056 3061
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
......
3101 3106
  for device in instance.disks:
3102 3107
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3103 3108
      lu.cfg.SetDiskID(disk, node)
3104
      if not rpc.call_blockdev_remove(node, disk):
3109
      if not lu.rpc.call_blockdev_remove(node, disk):
3105 3110
        logger.Error("could not remove block device %s on node %s,"
3106 3111
                     " continuing anyway" %
3107 3112
                     (device.iv_name, node))
......
3109 3114

  
3110 3115
  if instance.disk_template == constants.DT_FILE:
3111 3116
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3112
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3113
                                            file_storage_dir):
3117
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3118
                                               file_storage_dir):
3114 3119
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3115 3120
      result = False
3116 3121

  
......
3288 3293
             {"size": self.op.swap_size, "mode": "w"}]
3289 3294
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3290 3295
             "bridge": self.op.bridge}]
3291
    ial = IAllocator(self.cfg,
3296
    ial = IAllocator(self,
3292 3297
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3293 3298
                     name=self.op.instance_name,
3294 3299
                     disk_template=self.op.disk_template,
......
3365 3370
      src_node = self.op.src_node
3366 3371
      src_path = self.op.src_path
3367 3372

  
3368
      export_info = rpc.call_export_info(src_node, src_path)
3373
      export_info = self.rpc.call_export_info(src_node, src_path)
3369 3374

  
3370 3375
      if not export_info:
3371 3376
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
......
3435 3440
    # Check lv size requirements
3436 3441
    if req_size is not None:
3437 3442
      nodenames = [pnode.name] + self.secondaries
3438
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3439
                                    self.op.hypervisor)
3443
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3444
                                         self.op.hypervisor)
3440 3445
      for node in nodenames:
3441 3446
        info = nodeinfo.get(node, None)
3442 3447
        if not info:
......
3452 3457
                                     (node, info['vg_free'], req_size))
3453 3458

  
3454 3459
    # os verification
3455
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3460
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3456 3461
    if not os_obj:
3457 3462
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3458 3463
                                 " primary node"  % self.op.os_type)
......
3461 3466
      raise errors.OpPrereqError("Can't set instance kernel to none")
3462 3467

  
3463 3468
    # bridge check on primary node
3464
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3469
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3465 3470
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3466 3471
                                 " destination node '%s'" %
3467 3472
                                 (self.op.bridge, pnode.name))
......
3610 3615
    if iobj.disk_template != constants.DT_DISKLESS:
3611 3616
      if self.op.mode == constants.INSTANCE_CREATE:
3612 3617
        feedback_fn("* running the instance OS create scripts...")
3613
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3618
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3614 3619
          raise errors.OpExecError("could not add os for instance %s"
3615 3620
                                   " on node %s" %
3616 3621
                                   (instance, pnode_name))
......
3620 3625
        src_node = self.op.src_node
3621 3626
        src_image = self.src_image
3622 3627
        cluster_name = self.cfg.GetClusterName()
3623
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3624
                                           src_node, src_image, cluster_name):
3628
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3629
                                                src_node, src_image,
3630
                                                cluster_name):
3625 3631
          raise errors.OpExecError("Could not import os for instance"
3626 3632
                                   " %s on node %s" %
3627 3633
                                   (instance, pnode_name))
......
3633 3639
    if self.op.start:
3634 3640
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3635 3641
      feedback_fn("* starting instance...")
3636
      if not rpc.call_instance_start(pnode_name, iobj, None):
3642
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3637 3643
        raise errors.OpExecError("Could not start instance")
3638 3644

  
3639 3645

  
......
3668 3674
    instance = self.instance
3669 3675
    node = instance.primary_node
3670 3676

  
3671
    node_insts = rpc.call_instance_list([node],
3672
                                        [instance.hypervisor])[node]
3677
    node_insts = self.rpc.call_instance_list([node],
3678
                                             [instance.hypervisor])[node]
3673 3679
    if node_insts is False:
3674 3680
      raise errors.OpExecError("Can't connect to node %s." % node)
3675 3681

  
......
3729 3735
    """Compute a new secondary node using an IAllocator.
3730 3736

  
3731 3737
    """
3732
    ial = IAllocator(self.cfg,
3738
    ial = IAllocator(self,
3733 3739
                     mode=constants.IALLOCATOR_MODE_RELOC,
3734 3740
                     name=self.op.instance_name,
3735 3741
                     relocate_from=[self.sec_node])
......
3871 3877
    self.proc.LogStep(1, steps_total, "check device existence")
3872 3878
    info("checking volume groups")
3873 3879
    my_vg = cfg.GetVGName()
3874
    results = rpc.call_vg_list([oth_node, tgt_node])
3880
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3875 3881
    if not results:
3876 3882
      raise errors.OpExecError("Can't list volume groups on the nodes")
3877 3883
    for node in oth_node, tgt_node:
......
3885 3891
      for node in tgt_node, oth_node:
3886 3892
        info("checking %s on %s" % (dev.iv_name, node))
3887 3893
        cfg.SetDiskID(dev, node)
3888
        if not rpc.call_blockdev_find(node, dev):
3894
        if not self.rpc.call_blockdev_find(node, dev):
3889 3895
          raise errors.OpExecError("Can't find device %s on node %s" %
3890 3896
                                   (dev.iv_name, node))
3891 3897

  
......
3933 3939
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3934 3940
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3935 3941
      info("detaching %s drbd from local storage" % dev.iv_name)
3936
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3942
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3937 3943
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3938 3944
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3939 3945
      #dev.children = []
......
3952 3958
      # build the rename list based on what LVs exist on the node
3953 3959
      rlist = []
3954 3960
      for to_ren in old_lvs:
3955
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3961
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3956 3962
        if find_res is not None: # device exists
3957 3963
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3958 3964

  
3959 3965
      info("renaming the old LVs on the target node")
3960
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3966
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3961 3967
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3962 3968
      # now we rename the new LVs to the old LVs
3963 3969
      info("renaming the new LVs on the target node")
3964 3970
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3965
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3971
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3966 3972
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3967 3973

  
3968 3974
      for old, new in zip(old_lvs, new_lvs):
......
3975 3981

  
3976 3982
      # now that the new lvs have the old name, we can add them to the device
3977 3983
      info("adding new mirror component on %s" % tgt_node)
3978
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3984
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3979 3985
        for new_lv in new_lvs:
3980
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3986
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3981 3987
            warning("Can't rollback device %s", hint="manually cleanup unused"
3982 3988
                    " logical volumes")
3983 3989
        raise errors.OpExecError("Can't add local storage to drbd")
......
3996 4002
    # so check manually all the devices
3997 4003
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3998 4004
      cfg.SetDiskID(dev, instance.primary_node)
3999
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4005
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4000 4006
      if is_degr:
4001 4007
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4002 4008

  
......
4006 4012
      info("remove logical volumes for %s" % name)
4007 4013
      for lv in old_lvs:
4008 4014
        cfg.SetDiskID(lv, tgt_node)
4009
        if not rpc.call_blockdev_remove(tgt_node, lv):
4015
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4010 4016
          warning("Can't remove old LV", hint="manually remove unused LVs")
4011 4017
          continue
4012 4018

  
......
4044 4050
    self.proc.LogStep(1, steps_total, "check device existence")
4045 4051
    info("checking volume groups")
4046 4052
    my_vg = cfg.GetVGName()
4047
    results = rpc.call_vg_list([pri_node, new_node])
4053
    results = self.rpc.call_vg_list([pri_node, new_node])
4048 4054
    if not results:
4049 4055
      raise errors.OpExecError("Can't list volume groups on the nodes")
4050 4056
    for node in pri_node, new_node:
......
4057 4063
        continue
4058 4064
      info("checking %s on %s" % (dev.iv_name, pri_node))
4059 4065
      cfg.SetDiskID(dev, pri_node)
4060
      if not rpc.call_blockdev_find(pri_node, dev):
4066
      if not self.rpc.call_blockdev_find(pri_node, dev):
4061 4067
        raise errors.OpExecError("Can't find device %s on node %s" %
4062 4068
                                 (dev.iv_name, pri_node))
4063 4069

  
......
4124 4130
      # we have new devices, shutdown the drbd on the old secondary
4125 4131
      info("shutting down drbd for %s on old node" % dev.iv_name)
4126 4132
      cfg.SetDiskID(dev, old_node)
4127
      if not rpc.call_blockdev_shutdown(old_node, dev):
4133
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4128 4134
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4129 4135
                hint="Please cleanup this device manually as soon as possible")
4130 4136

  
......
4137 4143
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4138 4144
      # and 'find' the device, which will 'fix' it to match the
4139 4145
      # standalone state
4140
      if rpc.call_blockdev_find(pri_node, dev):
4146
      if self.rpc.call_blockdev_find(pri_node, dev):
4141 4147
        done += 1
4142 4148
      else:
4143 4149
        warning("Failed to detach drbd %s from network, unusual case" %
......
4169 4175
      # is correct
4170 4176
      cfg.SetDiskID(dev, pri_node)
4171 4177
      logging.debug("Disk to attach: %s", dev)
4172
      if not rpc.call_blockdev_find(pri_node, dev):
4178
      if not self.rpc.call_blockdev_find(pri_node, dev):
4173 4179
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4174 4180
                "please do a gnt-instance info to see the status of disks")
4175 4181

  
......
4182 4188
    # so check manually all the devices
4183 4189
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4184 4190
      cfg.SetDiskID(dev, pri_node)
4185
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4191
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4186 4192
      if is_degr:
4187 4193
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4188 4194

  
......
4191 4197
      info("remove logical volumes for %s" % name)
4192 4198
      for lv in old_lvs:
4193 4199
        cfg.SetDiskID(lv, old_node)
4194
        if not rpc.call_blockdev_remove(old_node, lv):
4200
        if not self.rpc.call_blockdev_remove(old_node, lv):
4195 4201
          warning("Can't remove LV on old secondary",
4196 4202
                  hint="Cleanup stale volumes by hand")
4197 4203

  
......
4280 4286
                                 (self.op.disk, instance.name))
4281 4287

  
4282 4288
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4283
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4284
                                  instance.hypervisor)
4289
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4290
                                       instance.hypervisor)
4285 4291
    for node in nodenames:
4286 4292
      info = nodeinfo.get(node, None)
4287 4293
      if not info:
......
4304 4310
    disk = instance.FindDisk(self.op.disk)
4305 4311
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4306 4312
      self.cfg.SetDiskID(disk, node)
4307
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4308
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4313
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4314
      if (not result or not isinstance(result, (list, tuple)) or
4315
          len(result) != 2):
4309 4316
        raise errors.OpExecError("grow request failed to node %s" % node)
4310 4317
      elif not result[0]:
4311 4318
        raise errors.OpExecError("grow request failed to node %s: %s" %
......
4367 4374

  
4368 4375
    """
4369 4376
    self.cfg.SetDiskID(dev, instance.primary_node)
4370
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4377
    dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4371 4378
    if dev.dev_type in constants.LDS_DRBD:
4372 4379
      # we change the snode then (otherwise we use the one passed in)
4373 4380
      if dev.logical_id[0] == instance.primary_node:
......
4377 4384

  
4378 4385
    if snode:
4379 4386
      self.cfg.SetDiskID(dev, snode)
4380
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4387
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4381 4388
    else:
4382 4389
      dev_sstatus = None
4383 4390

  
......
4403 4410
    """Gather and return data"""
4404 4411
    result = {}
4405 4412
    for instance in self.wanted_instances:
4406
      remote_info = rpc.call_instance_info(instance.primary_node,
4407
                                           instance.name,
4408
                                           instance.hypervisor)
4413
      remote_info = self.rpc.call_instance_info(instance.primary_node,
4414
                                                instance.name,
4415
                                                instance.hypervisor)
4409 4416
      if remote_info and "state" in remote_info:
4410 4417
        remote_state = "up"
4411 4418
      else:
......
4629 4636
      pnode = self.instance.primary_node
4630 4637
      nodelist = [pnode]
4631 4638
      nodelist.extend(instance.secondary_nodes)
4632
      instance_info = rpc.call_instance_info(pnode, instance.name,
4633
                                             instance.hypervisor)
4634
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4635
                                    instance.hypervisor)
4639
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4640
                                                  instance.hypervisor)
4641
      nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4642
                                         instance.hypervisor)
4636 4643

  
4637 4644
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4638 4645
        # Assume the primary node is unreachable and go ahead
......
4768 4775
      that node.
4769 4776

  
4770 4777
    """
4771
    return rpc.call_export_list(self.nodes)
4778
    return self.rpc.call_export_list(self.nodes)
4772 4779

  
4773 4780

  
4774 4781
class LUExportInstance(LogicalUnit):
......
4843 4850
    src_node = instance.primary_node
4844 4851
    if self.op.shutdown:
4845 4852
      # shutdown the instance, but not the disks
4846
      if not rpc.call_instance_shutdown(src_node, instance):
4853
      if not self.rpc.call_instance_shutdown(src_node, instance):
4847 4854
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4848 4855
                                 (instance.name, src_node))
4849 4856

  
......
4855 4862
      for disk in instance.disks:
4856 4863
        if disk.iv_name == "sda":
4857 4864
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4858
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4865
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4859 4866

  
4860 4867
          if not new_dev_name:
4861 4868
            logger.Error("could not snapshot block device %s on node %s" %
......
4869 4876

  
4870 4877
    finally:
4871 4878
      if self.op.shutdown and instance.status == "up":
4872
        if not rpc.call_instance_start(src_node, instance, None):
4879
        if not self.rpc.call_instance_start(src_node, instance, None):
4873 4880
          _ShutdownInstanceDisks(self, instance)
4874 4881
          raise errors.OpExecError("Could not start instance")
4875 4882

  
......
4877 4884

  
4878 4885
    cluster_name = self.cfg.GetClusterName()
4879 4886
    for dev in snap_disks:
4880
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4887
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4881 4888
                                      instance, cluster_name):
4882 4889
        logger.Error("could not export block device %s from node %s to node %s"
4883 4890
                     % (dev.logical_id[1], src_node, dst_node.name))
4884
      if not rpc.call_blockdev_remove(src_node, dev):
4891
      if not self.rpc.call_blockdev_remove(src_node, dev):
4885 4892
        logger.Error("could not remove snapshot block device %s from node %s" %
4886 4893
                     (dev.logical_id[1], src_node))
4887 4894

  
4888
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4895
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4889 4896
      logger.Error("could not finalize export for instance %s on node %s" %
4890 4897
                   (instance.name, dst_node.name))
4891 4898

  
......
4896 4903
    # if we proceed the backup would be removed because OpQueryExports
4897 4904
    # substitutes an empty list with the full cluster node list.
4898 4905
    if nodelist:
4899
      exportlist = rpc.call_export_list(nodelist)
4906
      exportlist = self.rpc.call_export_list(nodelist)
4900 4907
      for node in exportlist:
4901 4908
        if instance.name in exportlist[node]:
4902
          if not rpc.call_export_remove(node, instance.name):
4909
          if not self.rpc.call_export_remove(node, instance.name):
4903 4910
            logger.Error("could not remove older export for instance %s"
4904 4911
                         " on node %s" % (instance.name, node))
4905 4912

  
......
4935 4942
      fqdn_warn = True
4936 4943
      instance_name = self.op.instance_name
4937 4944

  
4938
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4945
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4946
      locking.LEVEL_NODE])
4939 4947
    found = False
4940 4948
    for node in exportlist:
4941 4949
      if instance_name in exportlist[node]:
4942 4950
        found = True
4943
        if not rpc.call_export_remove(node, instance_name):
4951
        if not self.rpc.call_export_remove(node, instance_name):
4944 4952
          logger.Error("could not remove export for instance %s"
4945 4953
                       " on node %s" % (instance_name, node))
4946 4954

  
......
5153 5161
      if not utils.TestDelay(self.op.duration):
5154 5162
        raise errors.OpExecError("Error during master delay test")
5155 5163
    if self.op.on_nodes:
5156
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5164
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5157 5165
      if not result:
5158 5166
        raise errors.OpExecError("Complete failure from rpc call")
5159 5167
      for node, node_result in result.items():
......
5183 5191
    "relocate_from",
5184 5192
    ]
5185 5193

  
5186
  def __init__(self, cfg, mode, name, **kwargs):
5187
    self.cfg = cfg
5194
  def __init__(self, lu, mode, name, **kwargs):
5195
    self.lu = lu
5188 5196
    # init buffer variables
5189 5197
    self.in_text = self.out_text = self.in_data = self.out_data = None
5190 5198
    # init all input fields so that pylint is happy
......
5221 5229
    This is the data that is independent of the actual operation.
5222 5230

  
5223 5231
    """
5224
    cfg = self.cfg
5232
    cfg = self.lu.cfg
5225 5233
    cluster_info = cfg.GetClusterInfo()
5226 5234
    # cluster data
5227 5235
    data = {
5228 5236
      "version": 1,
5229
      "cluster_name": self.cfg.GetClusterName(),
5237
      "cluster_name": cfg.GetClusterName(),
5230 5238
      "cluster_tags": list(cluster_info.GetTags()),
5231 5239
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5232 5240
      # we don't have job IDs
......
5239 5247
    node_list = cfg.GetNodeList()
5240 5248
    # FIXME: here we have only one hypervisor information, but
5241 5249
    # instance can belong to different hypervisors
5242
    node_data = rpc.call_node_info(node_list, cfg.GetVGName(),
5243
                                   cfg.GetHypervisorType())
5250
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5251
                                           cfg.GetHypervisorType())
5244 5252
    for nname in node_list:
5245 5253
      ninfo = cfg.GetNodeInfo(nname)
5246 5254
      if nname not in node_data or not isinstance(node_data[nname], dict):
......
5350 5358
    done.
5351 5359

  
5352 5360
    """
5353
    instance = self.cfg.GetInstanceInfo(self.name)
5361
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5354 5362
    if instance is None:
5355 5363
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5356 5364
                                   " IAllocator" % self.name)
......
5389 5397

  
5390 5398
    self.in_text = serializer.Dump(self.in_data)
5391 5399

  
5392
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5400
  def Run(self, name, validate=True, call_fn=None):
5393 5401
    """Run an instance allocator and return the results.
5394 5402

  
5395 5403
    """
5404
    if call_fn is None:
5405
      call_fn = self.lu.rpc.call_iallocator_runner
5396 5406
    data = self.in_text
5397 5407

  
5398
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5408
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5399 5409

  
5400 5410
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5401 5411
      raise errors.OpExecError("Invalid result from master iallocator runner")
......
5508 5518

  
5509 5519
    """
5510 5520
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5511
      ial = IAllocator(self.cfg,
5521
      ial = IAllocator(self,
5512 5522
                       mode=self.op.mode,
5513 5523
                       name=self.op.name,
5514 5524
                       mem_size=self.op.mem_size,
......
5520 5530
                       vcpus=self.op.vcpus,
5521 5531
                       )
5522 5532
    else:
5523
      ial = IAllocator(self.cfg,
5533
      ial = IAllocator(self,
5524 5534
                       mode=self.op.mode,
5525 5535
                       name=self.op.name,
5526 5536
                       relocate_from=list(self.relocate_from),

Also available in: Unified diff