Revision 72737a7f

b/daemons/ganeti-masterd
381 381
    # either single node cluster, or a misconfiguration, but I won't
382 382
    # break any other node, so I can proceed
383 383
    return True
384
  results = rpc.call_master_info(node_list)
384
  results = rpc.RpcRunner.call_master_info(node_list)
385 385
  if not isinstance(results, dict):
386 386
    # this should not happen (unless internal error in rpc)
387 387
    logging.critical("Can't complete rpc call, aborting master startup")
......
445 445

  
446 446
  # activate ip
447 447
  master_node = ssconf.SimpleConfigReader().GetMasterNode()
448
  if not rpc.call_node_start_master(master_node, False):
448
  if not rpc.RpcRunner.call_node_start_master(master_node, False):
449 449
    logging.error("Can't activate master IP address")
450 450

  
451 451
  master.setup_queue()
b/lib/bootstrap.py
38 38
from ganeti import objects
39 39
from ganeti import ssconf
40 40

  
41
from ganeti.rpc import RpcRunner
41 42

  
42 43
def _InitSSHSetup(node):
43 44
  """Setup the SSH configuration for the cluster.
......
236 237

  
237 238
  # start the master ip
238 239
  # TODO: Review rpc call from bootstrap
239
  rpc.call_node_start_master(hostname.name, True)
240
  RpcRunner.call_node_start_master(hostname.name, True)
240 241

  
241 242

  
242 243
def InitConfig(version, cluster_config, master_node_config,
......
281 282
  begun in cmdlib.LUDestroyOpcode.
282 283

  
283 284
  """
284
  if not rpc.call_node_stop_master(master, True):
285
  if not RpcRunner.call_node_stop_master(master, True):
285 286
    logging.warning("Could not disable the master role")
286
  if not rpc.call_node_leave_cluster(master):
287
  if not RpcRunner.call_node_leave_cluster(master):
287 288
    logging.warning("Could not shutdown the node daemon and cleanup the node")
288 289

  
289 290

  
......
365 366

  
366 367
  logging.info("setting master to %s, old master: %s", new_master, old_master)
367 368

  
368
  if not rpc.call_node_stop_master(old_master, True):
369
  if not RpcRunner.call_node_stop_master(old_master, True):
369 370
    logging.error("could disable the master role on the old master"
370 371
                 " %s, please disable manually", old_master)
371 372

  
......
374 375

  
375 376
  # Here we have a phase where no master should be running
376 377

  
377
  if not rpc.call_upload_file(cfg.GetNodeList(),
378
                              constants.CLUSTER_CONF_FILE):
378
  if not RpcRunner.call_upload_file(cfg.GetNodeList(),
379
                                    constants.CLUSTER_CONF_FILE):
379 380
    logging.error("could not distribute the new simple store master file"
380 381
                  " to the other nodes, please check.")
381 382

  
382
  if not rpc.call_node_start_master(new_master, True):
383
  if not RpcRunner.call_node_start_master(new_master, True):
383 384
    logging.error("could not start the master role on the new master"
384 385
                  " %s, please check", new_master)
385 386
    rcode = 1
b/lib/cmdlib.py
32 32
import platform
33 33
import logging
34 34

  
35
from ganeti import rpc
36 35
from ganeti import ssh
37 36
from ganeti import logger
38 37
from ganeti import utils
......
67 66
  REQ_MASTER = True
68 67
  REQ_BGL = True
69 68

  
70
  def __init__(self, processor, op, context):
69
  def __init__(self, processor, op, context, rpc):
71 70
    """Constructor for LogicalUnit.
72 71

  
73 72
    This needs to be overriden in derived classes in order to check op
......
78 77
    self.op = op
79 78
    self.cfg = context.cfg
80 79
    self.context = context
80
    self.rpc = rpc
81 81
    # Dicts used to declare locking needs to mcpu
82 82
    self.needed_locks = None
83 83
    self.acquired_locks = {}
......
448 448
  """
449 449
  # check bridges existance
450 450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
451
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
452 452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453 453
                               " exist on destination node '%s'" %
454 454
                               (brlist, instance.primary_node))
......
484 484

  
485 485
    """
486 486
    master = self.cfg.GetMasterNode()
487
    if not rpc.call_node_stop_master(master, False):
487
    if not self.rpc.call_node_stop_master(master, False):
488 488
      raise errors.OpExecError("Could not disable the master role")
489 489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490 490
    utils.CreateBackup(priv_key)
......
742 742
    local_checksums = utils.FingerprintFiles(file_names)
743 743

  
744 744
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
745
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
746
    all_instanceinfo = rpc.call_instance_list(nodelist, hypervisors)
747
    all_vglist = rpc.call_vg_list(nodelist)
745
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
746
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
747
    all_vglist = self.rpc.call_vg_list(nodelist)
748 748
    node_verify_param = {
749 749
      'filelist': file_names,
750 750
      'nodelist': nodelist,
......
752 752
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
753 753
                        for node in nodeinfo]
754 754
      }
755
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param,
756
                                      self.cfg.GetClusterName())
757
    all_rversion = rpc.call_version(nodelist)
758
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759
                                   self.cfg.GetHypervisorType())
755
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
756
                                           self.cfg.GetClusterName())
757
    all_rversion = self.rpc.call_version(nodelist)
758
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759
                                        self.cfg.GetHypervisorType())
760 760

  
761 761
    for node in nodelist:
762 762
      feedback_fn("* Verifying node %s" % node)
......
970 970
    if not nv_dict:
971 971
      return result
972 972

  
973
    node_lvs = rpc.call_volume_list(nodes, vg_name)
973
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
974 974

  
975 975
    to_act = set()
976 976
    for node in nodes:
......
1051 1051

  
1052 1052
    # shutdown the master IP
1053 1053
    master = self.cfg.GetMasterNode()
1054
    if not rpc.call_node_stop_master(master, False):
1054
    if not self.rpc.call_node_stop_master(master, False):
1055 1055
      raise errors.OpExecError("Could not disable the master role")
1056 1056

  
1057 1057
    try:
......
1069 1069
      logger.Debug("Copying updated ssconf data to all nodes")
1070 1070
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1071 1071
        fname = ss.KeyToFilename(keyname)
1072
        result = rpc.call_upload_file(dist_nodes, fname)
1072
        result = self.rpc.call_upload_file(dist_nodes, fname)
1073 1073
        for to_node in dist_nodes:
1074 1074
          if not result[to_node]:
1075 1075
            logger.Error("copy of file %s to node %s failed" %
1076 1076
                         (fname, to_node))
1077 1077
    finally:
1078
      if not rpc.call_node_start_master(master, False):
1078
      if not self.rpc.call_node_start_master(master, False):
1079 1079
        logger.Error("Could not re-enable the master role on the master,"
1080 1080
                     " please restart manually.")
1081 1081

  
......
1145 1145
    # if vg_name not None, checks given volume group on all nodes
1146 1146
    if self.op.vg_name:
1147 1147
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1148
      vglist = rpc.call_vg_list(node_list)
1148
      vglist = self.rpc.call_vg_list(node_list)
1149 1149
      for node in node_list:
1150 1150
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1151 1151
                                              constants.MIN_VG_SIZE)
......
1184 1184
    max_time = 0
1185 1185
    done = True
1186 1186
    cumul_degraded = False
1187
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1187
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1188 1188
    if not rstats:
1189 1189
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1190 1190
      retries += 1
......
1238 1238

  
1239 1239
  result = True
1240 1240
  if on_primary or dev.AssembleOnSecondary():
1241
    rstats = rpc.call_blockdev_find(node, dev)
1241
    rstats = lu.rpc.call_blockdev_find(node, dev)
1242 1242
    if not rstats:
1243 1243
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1244 1244
      result = False
......
1313 1313

  
1314 1314
    """
1315 1315
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1316
    node_data = rpc.call_os_diagnose(node_list)
1316
    node_data = self.rpc.call_os_diagnose(node_list)
1317 1317
    if node_data == False:
1318 1318
      raise errors.OpExecError("Can't gather the list of OSes")
1319 1319
    pol = self._DiagnoseByOS(node_list, node_data)
......
1403 1403

  
1404 1404
    self.context.RemoveNode(node.name)
1405 1405

  
1406
    rpc.call_node_leave_cluster(node.name)
1406
    self.rpc.call_node_leave_cluster(node.name)
1407 1407

  
1408 1408

  
1409 1409
class LUQueryNodes(NoHooksLU):
......
1475 1475

  
1476 1476
    if self.dynamic_fields.intersection(self.op.output_fields):
1477 1477
      live_data = {}
1478
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479
                                     self.cfg.GetHypervisorType())
1478
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479
                                          self.cfg.GetHypervisorType())
1480 1480
      for name in nodenames:
1481 1481
        nodeinfo = node_data.get(name, None)
1482 1482
        if nodeinfo:
......
1577 1577

  
1578 1578
    """
1579 1579
    nodenames = self.nodes
1580
    volumes = rpc.call_node_volumes(nodenames)
1580
    volumes = self.rpc.call_node_volumes(nodenames)
1581 1581

  
1582 1582
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1583 1583
             in self.cfg.GetInstanceList()]
......
1732 1732
    node = new_node.name
1733 1733

  
1734 1734
    # check connectivity
1735
    result = rpc.call_version([node])[node]
1735
    result = self.rpc.call_version([node])[node]
1736 1736
    if result:
1737 1737
      if constants.PROTOCOL_VERSION == result:
1738 1738
        logger.Info("communication to node %s fine, sw version %s match" %
......
1759 1759
      finally:
1760 1760
        f.close()
1761 1761

  
1762
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1763
                               keyarray[3], keyarray[4], keyarray[5])
1762
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1763
                                    keyarray[2],
1764
                                    keyarray[3], keyarray[4], keyarray[5])
1764 1765

  
1765 1766
    if not result:
1766 1767
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
......
1769 1770
    utils.AddHostToEtcHosts(new_node.name)
1770 1771

  
1771 1772
    if new_node.secondary_ip != new_node.primary_ip:
1772
      if not rpc.call_node_tcp_ping(new_node.name,
1773
                                    constants.LOCALHOST_IP_ADDRESS,
1774
                                    new_node.secondary_ip,
1775
                                    constants.DEFAULT_NODED_PORT,
1776
                                    10, False):
1773
      if not self.rpc.call_node_tcp_ping(new_node.name,
1774
                                         constants.LOCALHOST_IP_ADDRESS,
1775
                                         new_node.secondary_ip,
1776
                                         constants.DEFAULT_NODED_PORT,
1777
                                         10, False):
1777 1778
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1778 1779
                                 " you gave (%s). Please fix and re-run this"
1779 1780
                                 " command." % new_node.secondary_ip)
......
1784 1785
      # TODO: do a node-net-test as well?
1785 1786
    }
1786 1787

  
1787
    result = rpc.call_node_verify(node_verify_list, node_verify_param,
1788
                                  self.cfg.GetClusterName())
1788
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1789
                                       self.cfg.GetClusterName())
1789 1790
    for verifier in node_verify_list:
1790 1791
      if not result[verifier]:
1791 1792
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
......
1807 1808

  
1808 1809
    logger.Debug("Copying hosts and known_hosts to all nodes")
1809 1810
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1810
      result = rpc.call_upload_file(dist_nodes, fname)
1811
      result = self.rpc.call_upload_file(dist_nodes, fname)
1811 1812
      for to_node in dist_nodes:
1812 1813
        if not result[to_node]:
1813 1814
          logger.Error("copy of file %s to node %s failed" %
......
1817 1818
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1818 1819
      to_copy.append(constants.VNC_PASSWORD_FILE)
1819 1820
    for fname in to_copy:
1820
      result = rpc.call_upload_file([node], fname)
1821
      result = self.rpc.call_upload_file([node], fname)
1821 1822
      if not result[node]:
1822 1823
        logger.Error("could not copy file %s to node %s" % (fname, node))
1823 1824

  
......
1968 1969
  for inst_disk in instance.disks:
1969 1970
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1970 1971
      lu.cfg.SetDiskID(node_disk, node)
1971
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1972
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1972 1973
      if not result:
1973 1974
        logger.Error("could not prepare block device %s on node %s"
1974 1975
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
......
1983 1984
      if node != instance.primary_node:
1984 1985
        continue
1985 1986
      lu.cfg.SetDiskID(node_disk, node)
1986
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1987
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1987 1988
      if not result:
1988 1989
        logger.Error("could not prepare block device %s on node %s"
1989 1990
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
......
2054 2055
  _ShutdownInstanceDisks.
2055 2056

  
2056 2057
  """
2057
  ins_l = rpc.call_instance_list([instance.primary_node],
2058
                                 [instance.hypervisor])
2058
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2059
                                      [instance.hypervisor])
2059 2060
  ins_l = ins_l[instance.primary_node]
2060 2061
  if not type(ins_l) is list:
2061 2062
    raise errors.OpExecError("Can't contact node '%s'" %
......
2081 2082
  for disk in instance.disks:
2082 2083
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2083 2084
      lu.cfg.SetDiskID(top_disk, node)
2084
      if not rpc.call_blockdev_shutdown(node, top_disk):
2085
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2085 2086
        logger.Error("could not shutdown block device %s on node %s" %
2086 2087
                     (disk.iv_name, node))
2087 2088
        if not ignore_primary or node != instance.primary_node:
......
2111 2112
      we cannot check the node
2112 2113

  
2113 2114
  """
2114
  nodeinfo = rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2115
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2115 2116
  if not nodeinfo or not isinstance(nodeinfo, dict):
2116 2117
    raise errors.OpPrereqError("Could not contact node %s for resource"
2117 2118
                             " information" % (node,))
......
2189 2190

  
2190 2191
    _StartInstanceDisks(self, instance, force)
2191 2192

  
2192
    if not rpc.call_instance_start(node_current, instance, extra_args):
2193
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2193 2194
      _ShutdownInstanceDisks(self, instance)
2194 2195
      raise errors.OpExecError("Could not start instance")
2195 2196

  
......
2260 2261

  
2261 2262
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2262 2263
                       constants.INSTANCE_REBOOT_HARD]:
2263
      if not rpc.call_instance_reboot(node_current, instance,
2264
                                      reboot_type, extra_args):
2264
      if not self.rpc.call_instance_reboot(node_current, instance,
2265
                                           reboot_type, extra_args):
2265 2266
        raise errors.OpExecError("Could not reboot instance")
2266 2267
    else:
2267
      if not rpc.call_instance_shutdown(node_current, instance):
2268
      if not self.rpc.call_instance_shutdown(node_current, instance):
2268 2269
        raise errors.OpExecError("could not shutdown instance for full reboot")
2269 2270
      _ShutdownInstanceDisks(self, instance)
2270 2271
      _StartInstanceDisks(self, instance, ignore_secondaries)
2271
      if not rpc.call_instance_start(node_current, instance, extra_args):
2272
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2272 2273
        _ShutdownInstanceDisks(self, instance)
2273 2274
        raise errors.OpExecError("Could not start instance for full reboot")
2274 2275

  
......
2321 2322
    instance = self.instance
2322 2323
    node_current = instance.primary_node
2323 2324
    self.cfg.MarkInstanceDown(instance.name)
2324
    if not rpc.call_instance_shutdown(node_current, instance):
2325
    if not self.rpc.call_instance_shutdown(node_current, instance):
2325 2326
      logger.Error("could not shutdown instance")
2326 2327

  
2327 2328
    _ShutdownInstanceDisks(self, instance)
......
2372 2373
    if instance.status != "down":
2373 2374
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2374 2375
                                 self.op.instance_name)
2375
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2376
                                         instance.hypervisor)
2376
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2377
                                              instance.name,
2378
                                              instance.hypervisor)
2377 2379
    if remote_info:
2378 2380
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2379 2381
                                 (self.op.instance_name,
......
2387 2389
      if pnode is None:
2388 2390
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2389 2391
                                   self.op.pnode)
2390
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2392
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2391 2393
      if not os_obj:
2392 2394
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2393 2395
                                   " primary node"  % self.op.os_type)
......
2408 2410
    _StartInstanceDisks(self, inst, None)
2409 2411
    try:
2410 2412
      feedback_fn("Running the instance OS create scripts...")
2411
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2413
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2414
                                           "sda", "sdb"):
2412 2415
        raise errors.OpExecError("Could not install OS for instance %s"
2413 2416
                                 " on node %s" %
2414 2417
                                 (inst.name, inst.primary_node))
......
2450 2453
    if instance.status != "down":
2451 2454
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2452 2455
                                 self.op.instance_name)
2453
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2454
                                         instance.hypervisor)
2456
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2457
                                              instance.name,
2458
                                              instance.hypervisor)
2455 2459
    if remote_info:
2456 2460
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2457 2461
                                 (self.op.instance_name,
......
2493 2497

  
2494 2498
    if inst.disk_template == constants.DT_FILE:
2495 2499
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2496
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2497
                                                old_file_storage_dir,
2498
                                                new_file_storage_dir)
2500
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2501
                                                     old_file_storage_dir,
2502
                                                     new_file_storage_dir)
2499 2503

  
2500 2504
      if not result:
2501 2505
        raise errors.OpExecError("Could not connect to node '%s' to rename"
......
2512 2516

  
2513 2517
    _StartInstanceDisks(self, inst, None)
2514 2518
    try:
2515
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2516
                                          "sda", "sdb"):
2519
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2520
                                               old_name,
2521
                                               "sda", "sdb"):
2517 2522
        msg = ("Could not run OS rename script for instance %s on node %s"
2518 2523
               " (but the instance has been renamed in Ganeti)" %
2519 2524
               (inst.name, inst.primary_node))
......
2568 2573
    logger.Info("shutting down instance %s on node %s" %
2569 2574
                (instance.name, instance.primary_node))
2570 2575

  
2571
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2576
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2572 2577
      if self.op.ignore_failures:
2573 2578
        feedback_fn("Warning: can't shutdown instance")
2574 2579
      else:
......
2664 2669
    bad_nodes = []
2665 2670
    if self.dynamic_fields.intersection(self.op.output_fields):
2666 2671
      live_data = {}
2667
      node_data = rpc.call_all_instances_info(nodes, hv_list)
2672
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2668 2673
      for name in nodes:
2669 2674
        result = node_data[name]
2670 2675
        if result:
......
2820 2825

  
2821 2826
    # check bridge existance
2822 2827
    brlist = [nic.bridge for nic in instance.nics]
2823
    if not rpc.call_bridges_exist(target_node, brlist):
2828
    if not self.rpc.call_bridges_exist(target_node, brlist):
2824 2829
      raise errors.OpPrereqError("One or more target bridges %s does not"
2825 2830
                                 " exist on destination node '%s'" %
2826 2831
                                 (brlist, target_node))
......
2849 2854
    logger.Info("Shutting down instance %s on node %s" %
2850 2855
                (instance.name, source_node))
2851 2856

  
2852
    if not rpc.call_instance_shutdown(source_node, instance):
2857
    if not self.rpc.call_instance_shutdown(source_node, instance):
2853 2858
      if self.op.ignore_consistency:
2854 2859
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2855 2860
                     " anyway. Please make sure node %s is down"  %
......
2879 2884
        raise errors.OpExecError("Can't activate the instance's disks")
2880 2885

  
2881 2886
      feedback_fn("* starting the instance on the target node")
2882
      if not rpc.call_instance_start(target_node, instance, None):
2887
      if not self.rpc.call_instance_start(target_node, instance, None):
2883 2888
        _ShutdownInstanceDisks(self, instance)
2884 2889
        raise errors.OpExecError("Could not start instance %s on node %s." %
2885 2890
                                 (instance.name, target_node))
......
2897 2902
        return False
2898 2903

  
2899 2904
  lu.cfg.SetDiskID(device, node)
2900
  new_id = rpc.call_blockdev_create(node, device, device.size,
2901
                                    instance.name, True, info)
2905
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2906
                                       instance.name, True, info)
2902 2907
  if not new_id:
2903 2908
    return False
2904 2909
  if device.physical_id is None:
......
2926 2931
  if not force:
2927 2932
    return True
2928 2933
  lu.cfg.SetDiskID(device, node)
2929
  new_id = rpc.call_blockdev_create(node, device, device.size,
2930
                                    instance.name, False, info)
2934
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2935
                                       instance.name, False, info)
2931 2936
  if not new_id:
2932 2937
    return False
2933 2938
  if device.physical_id is None:
......
3049 3054

  
3050 3055
  if instance.disk_template == constants.DT_FILE:
3051 3056
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3052
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3053
                                              file_storage_dir)
3057
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3058
                                                 file_storage_dir)
3054 3059

  
3055 3060
    if not result:
3056 3061
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
......
3101 3106
  for device in instance.disks:
3102 3107
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3103 3108
      lu.cfg.SetDiskID(disk, node)
3104
      if not rpc.call_blockdev_remove(node, disk):
3109
      if not lu.rpc.call_blockdev_remove(node, disk):
3105 3110
        logger.Error("could not remove block device %s on node %s,"
3106 3111
                     " continuing anyway" %
3107 3112
                     (device.iv_name, node))
......
3109 3114

  
3110 3115
  if instance.disk_template == constants.DT_FILE:
3111 3116
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3112
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3113
                                            file_storage_dir):
3117
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3118
                                               file_storage_dir):
3114 3119
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3115 3120
      result = False
3116 3121

  
......
3288 3293
             {"size": self.op.swap_size, "mode": "w"}]
3289 3294
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3290 3295
             "bridge": self.op.bridge}]
3291
    ial = IAllocator(self.cfg,
3296
    ial = IAllocator(self,
3292 3297
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3293 3298
                     name=self.op.instance_name,
3294 3299
                     disk_template=self.op.disk_template,
......
3365 3370
      src_node = self.op.src_node
3366 3371
      src_path = self.op.src_path
3367 3372

  
3368
      export_info = rpc.call_export_info(src_node, src_path)
3373
      export_info = self.rpc.call_export_info(src_node, src_path)
3369 3374

  
3370 3375
      if not export_info:
3371 3376
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
......
3435 3440
    # Check lv size requirements
3436 3441
    if req_size is not None:
3437 3442
      nodenames = [pnode.name] + self.secondaries
3438
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3439
                                    self.op.hypervisor)
3443
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3444
                                         self.op.hypervisor)
3440 3445
      for node in nodenames:
3441 3446
        info = nodeinfo.get(node, None)
3442 3447
        if not info:
......
3452 3457
                                     (node, info['vg_free'], req_size))
3453 3458

  
3454 3459
    # os verification
3455
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3460
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3456 3461
    if not os_obj:
3457 3462
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3458 3463
                                 " primary node"  % self.op.os_type)
......
3461 3466
      raise errors.OpPrereqError("Can't set instance kernel to none")
3462 3467

  
3463 3468
    # bridge check on primary node
3464
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3469
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3465 3470
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3466 3471
                                 " destination node '%s'" %
3467 3472
                                 (self.op.bridge, pnode.name))
......
3610 3615
    if iobj.disk_template != constants.DT_DISKLESS:
3611 3616
      if self.op.mode == constants.INSTANCE_CREATE:
3612 3617
        feedback_fn("* running the instance OS create scripts...")
3613
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3618
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3614 3619
          raise errors.OpExecError("could not add os for instance %s"
3615 3620
                                   " on node %s" %
3616 3621
                                   (instance, pnode_name))
......
3620 3625
        src_node = self.op.src_node
3621 3626
        src_image = self.src_image
3622 3627
        cluster_name = self.cfg.GetClusterName()
3623
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3624
                                           src_node, src_image, cluster_name):
3628
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3629
                                                src_node, src_image,
3630
                                                cluster_name):
3625 3631
          raise errors.OpExecError("Could not import os for instance"
3626 3632
                                   " %s on node %s" %
3627 3633
                                   (instance, pnode_name))
......
3633 3639
    if self.op.start:
3634 3640
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3635 3641
      feedback_fn("* starting instance...")
3636
      if not rpc.call_instance_start(pnode_name, iobj, None):
3642
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3637 3643
        raise errors.OpExecError("Could not start instance")
3638 3644

  
3639 3645

  
......
3668 3674
    instance = self.instance
3669 3675
    node = instance.primary_node
3670 3676

  
3671
    node_insts = rpc.call_instance_list([node],
3672
                                        [instance.hypervisor])[node]
3677
    node_insts = self.rpc.call_instance_list([node],
3678
                                             [instance.hypervisor])[node]
3673 3679
    if node_insts is False:
3674 3680
      raise errors.OpExecError("Can't connect to node %s." % node)
3675 3681

  
......
3729 3735
    """Compute a new secondary node using an IAllocator.
3730 3736

  
3731 3737
    """
3732
    ial = IAllocator(self.cfg,
3738
    ial = IAllocator(self,
3733 3739
                     mode=constants.IALLOCATOR_MODE_RELOC,
3734 3740
                     name=self.op.instance_name,
3735 3741
                     relocate_from=[self.sec_node])
......
3871 3877
    self.proc.LogStep(1, steps_total, "check device existence")
3872 3878
    info("checking volume groups")
3873 3879
    my_vg = cfg.GetVGName()
3874
    results = rpc.call_vg_list([oth_node, tgt_node])
3880
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3875 3881
    if not results:
3876 3882
      raise errors.OpExecError("Can't list volume groups on the nodes")
3877 3883
    for node in oth_node, tgt_node:
......
3885 3891
      for node in tgt_node, oth_node:
3886 3892
        info("checking %s on %s" % (dev.iv_name, node))
3887 3893
        cfg.SetDiskID(dev, node)
3888
        if not rpc.call_blockdev_find(node, dev):
3894
        if not self.rpc.call_blockdev_find(node, dev):
3889 3895
          raise errors.OpExecError("Can't find device %s on node %s" %
3890 3896
                                   (dev.iv_name, node))
3891 3897

  
......
3933 3939
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3934 3940
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3935 3941
      info("detaching %s drbd from local storage" % dev.iv_name)
3936
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3942
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3937 3943
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3938 3944
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3939 3945
      #dev.children = []
......
3952 3958
      # build the rename list based on what LVs exist on the node
3953 3959
      rlist = []
3954 3960
      for to_ren in old_lvs:
3955
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3961
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3956 3962
        if find_res is not None: # device exists
3957 3963
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3958 3964

  
3959 3965
      info("renaming the old LVs on the target node")
3960
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3966
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3961 3967
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3962 3968
      # now we rename the new LVs to the old LVs
3963 3969
      info("renaming the new LVs on the target node")
3964 3970
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3965
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3971
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3966 3972
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3967 3973

  
3968 3974
      for old, new in zip(old_lvs, new_lvs):
......
3975 3981

  
3976 3982
      # now that the new lvs have the old name, we can add them to the device
3977 3983
      info("adding new mirror component on %s" % tgt_node)
3978
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3984
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3979 3985
        for new_lv in new_lvs:
3980
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3986
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3981 3987
            warning("Can't rollback device %s", hint="manually cleanup unused"
3982 3988
                    " logical volumes")
3983 3989
        raise errors.OpExecError("Can't add local storage to drbd")
......
3996 4002
    # so check manually all the devices
3997 4003
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3998 4004
      cfg.SetDiskID(dev, instance.primary_node)
3999
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4005
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4000 4006
      if is_degr:
4001 4007
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4002 4008

  
......
4006 4012
      info("remove logical volumes for %s" % name)
4007 4013
      for lv in old_lvs:
4008 4014
        cfg.SetDiskID(lv, tgt_node)
4009
        if not rpc.call_blockdev_remove(tgt_node, lv):
4015
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4010 4016
          warning("Can't remove old LV", hint="manually remove unused LVs")
4011 4017
          continue
4012 4018

  
......
4044 4050
    self.proc.LogStep(1, steps_total, "check device existence")
4045 4051
    info("checking volume groups")
4046 4052
    my_vg = cfg.GetVGName()
4047
    results = rpc.call_vg_list([pri_node, new_node])
4053
    results = self.rpc.call_vg_list([pri_node, new_node])
4048 4054
    if not results:
4049 4055
      raise errors.OpExecError("Can't list volume groups on the nodes")
4050 4056
    for node in pri_node, new_node:
......
4057 4063
        continue
4058 4064
      info("checking %s on %s" % (dev.iv_name, pri_node))
4059 4065
      cfg.SetDiskID(dev, pri_node)
4060
      if not rpc.call_blockdev_find(pri_node, dev):
4066
      if not self.rpc.call_blockdev_find(pri_node, dev):
4061 4067
        raise errors.OpExecError("Can't find device %s on node %s" %
4062 4068
                                 (dev.iv_name, pri_node))
4063 4069

  
......
4124 4130
      # we have new devices, shutdown the drbd on the old secondary
4125 4131
      info("shutting down drbd for %s on old node" % dev.iv_name)
4126 4132
      cfg.SetDiskID(dev, old_node)
4127
      if not rpc.call_blockdev_shutdown(old_node, dev):
4133
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4128 4134
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4129 4135
                hint="Please cleanup this device manually as soon as possible")
4130 4136

  
......
4137 4143
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4138 4144
      # and 'find' the device, which will 'fix' it to match the
4139 4145
      # standalone state
4140
      if rpc.call_blockdev_find(pri_node, dev):
4146
      if self.rpc.call_blockdev_find(pri_node, dev):
4141 4147
        done += 1
4142 4148
      else:
4143 4149
        warning("Failed to detach drbd %s from network, unusual case" %
......
4169 4175
      # is correct
4170 4176
      cfg.SetDiskID(dev, pri_node)
4171 4177
      logging.debug("Disk to attach: %s", dev)
4172
      if not rpc.call_blockdev_find(pri_node, dev):
4178
      if not self.rpc.call_blockdev_find(pri_node, dev):
4173 4179
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4174 4180
                "please do a gnt-instance info to see the status of disks")
4175 4181

  
......
4182 4188
    # so check manually all the devices
4183 4189
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4184 4190
      cfg.SetDiskID(dev, pri_node)
4185
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4191
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4186 4192
      if is_degr:
4187 4193
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4188 4194

  
......
4191 4197
      info("remove logical volumes for %s" % name)
4192 4198
      for lv in old_lvs:
4193 4199
        cfg.SetDiskID(lv, old_node)
4194
        if not rpc.call_blockdev_remove(old_node, lv):
4200
        if not self.rpc.call_blockdev_remove(old_node, lv):
4195 4201
          warning("Can't remove LV on old secondary",
4196 4202
                  hint="Cleanup stale volumes by hand")
4197 4203

  
......
4280 4286
                                 (self.op.disk, instance.name))
4281 4287

  
4282 4288
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4283
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4284
                                  instance.hypervisor)
4289
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4290
                                       instance.hypervisor)
4285 4291
    for node in nodenames:
4286 4292
      info = nodeinfo.get(node, None)
4287 4293
      if not info:
......
4304 4310
    disk = instance.FindDisk(self.op.disk)
4305 4311
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4306 4312
      self.cfg.SetDiskID(disk, node)
4307
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4308
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4313
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4314
      if (not result or not isinstance(result, (list, tuple)) or
4315
          len(result) != 2):
4309 4316
        raise errors.OpExecError("grow request failed to node %s" % node)
4310 4317
      elif not result[0]:
4311 4318
        raise errors.OpExecError("grow request failed to node %s: %s" %
......
4367 4374

  
4368 4375
    """
4369 4376
    self.cfg.SetDiskID(dev, instance.primary_node)
4370
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4377
    dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4371 4378
    if dev.dev_type in constants.LDS_DRBD:
4372 4379
      # we change the snode then (otherwise we use the one passed in)
4373 4380
      if dev.logical_id[0] == instance.primary_node:
......
4377 4384

  
4378 4385
    if snode:
4379 4386
      self.cfg.SetDiskID(dev, snode)
4380
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4387
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4381 4388
    else:
4382 4389
      dev_sstatus = None
4383 4390

  
......
4403 4410
    """Gather and return data"""
4404 4411
    result = {}
4405 4412
    for instance in self.wanted_instances:
4406
      remote_info = rpc.call_instance_info(instance.primary_node,
4407
                                           instance.name,
4408
                                           instance.hypervisor)
4413
      remote_info = self.rpc.call_instance_info(instance.primary_node,
4414
                                                instance.name,
4415
                                                instance.hypervisor)
4409 4416
      if remote_info and "state" in remote_info:
4410 4417
        remote_state = "up"
4411 4418
      else:
......
4629 4636
      pnode = self.instance.primary_node
4630 4637
      nodelist = [pnode]
4631 4638
      nodelist.extend(instance.secondary_nodes)
4632
      instance_info = rpc.call_instance_info(pnode, instance.name,
4633
                                             instance.hypervisor)
4634
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4635
                                    instance.hypervisor)
4639
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4640
                                                  instance.hypervisor)
4641
      nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4642
                                         instance.hypervisor)
4636 4643

  
4637 4644
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4638 4645
        # Assume the primary node is unreachable and go ahead
......
4768 4775
      that node.
4769 4776

  
4770 4777
    """
4771
    return rpc.call_export_list(self.nodes)
4778
    return self.rpc.call_export_list(self.nodes)
4772 4779

  
4773 4780

  
4774 4781
class LUExportInstance(LogicalUnit):
......
4843 4850
    src_node = instance.primary_node
4844 4851
    if self.op.shutdown:
4845 4852
      # shutdown the instance, but not the disks
4846
      if not rpc.call_instance_shutdown(src_node, instance):
4853
      if not self.rpc.call_instance_shutdown(src_node, instance):
4847 4854
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4848 4855
                                 (instance.name, src_node))
4849 4856

  
......
4855 4862
      for disk in instance.disks:
4856 4863
        if disk.iv_name == "sda":
4857 4864
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4858
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4865
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4859 4866

  
4860 4867
          if not new_dev_name:
4861 4868
            logger.Error("could not snapshot block device %s on node %s" %
......
4869 4876

  
4870 4877
    finally:
4871 4878
      if self.op.shutdown and instance.status == "up":
4872
        if not rpc.call_instance_start(src_node, instance, None):
4879
        if not self.rpc.call_instance_start(src_node, instance, None):
4873 4880
          _ShutdownInstanceDisks(self, instance)
4874 4881
          raise errors.OpExecError("Could not start instance")
4875 4882

  
......
4877 4884

  
4878 4885
    cluster_name = self.cfg.GetClusterName()
4879 4886
    for dev in snap_disks:
4880
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4887
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4881 4888
                                      instance, cluster_name):
4882 4889
        logger.Error("could not export block device %s from node %s to node %s"
4883 4890
                     % (dev.logical_id[1], src_node, dst_node.name))
4884
      if not rpc.call_blockdev_remove(src_node, dev):
4891
      if not self.rpc.call_blockdev_remove(src_node, dev):
4885 4892
        logger.Error("could not remove snapshot block device %s from node %s" %
4886 4893
                     (dev.logical_id[1], src_node))
4887 4894

  
4888
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4895
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4889 4896
      logger.Error("could not finalize export for instance %s on node %s" %
4890 4897
                   (instance.name, dst_node.name))
4891 4898

  
......
4896 4903
    # if we proceed the backup would be removed because OpQueryExports
4897 4904
    # substitutes an empty list with the full cluster node list.
4898 4905
    if nodelist:
4899
      exportlist = rpc.call_export_list(nodelist)
4906
      exportlist = self.rpc.call_export_list(nodelist)
4900 4907
      for node in exportlist:
4901 4908
        if instance.name in exportlist[node]:
4902
          if not rpc.call_export_remove(node, instance.name):
4909
          if not self.rpc.call_export_remove(node, instance.name):
4903 4910
            logger.Error("could not remove older export for instance %s"
4904 4911
                         " on node %s" % (instance.name, node))
4905 4912

  
......
4935 4942
      fqdn_warn = True
4936 4943
      instance_name = self.op.instance_name
4937 4944

  
4938
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4945
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4946
      locking.LEVEL_NODE])
4939 4947
    found = False
4940 4948
    for node in exportlist:
4941 4949
      if instance_name in exportlist[node]:
4942 4950
        found = True
4943
        if not rpc.call_export_remove(node, instance_name):
4951
        if not self.rpc.call_export_remove(node, instance_name):
4944 4952
          logger.Error("could not remove export for instance %s"
4945 4953
                       " on node %s" % (instance_name, node))
4946 4954

  
......
5153 5161
      if not utils.TestDelay(self.op.duration):
5154 5162
        raise errors.OpExecError("Error during master delay test")
5155 5163
    if self.op.on_nodes:
5156
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5164
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5157 5165
      if not result:
5158 5166
        raise errors.OpExecError("Complete failure from rpc call")
5159 5167
      for node, node_result in result.items():
......
5183 5191
    "relocate_from",
5184 5192
    ]
5185 5193

  
5186
  def __init__(self, cfg, mode, name, **kwargs):
5187
    self.cfg = cfg
5194
  def __init__(self, lu, mode, name, **kwargs):
5195
    self.lu = lu
5188 5196
    # init buffer variables
5189 5197
    self.in_text = self.out_text = self.in_data = self.out_data = None
5190 5198
    # init all input fields so that pylint is happy
......
5221 5229
    This is the data that is independent of the actual operation.
5222 5230

  
5223 5231
    """
5224
    cfg = self.cfg
5232
    cfg = self.lu.cfg
5225 5233
    cluster_info = cfg.GetClusterInfo()
5226 5234
    # cluster data
5227 5235
    data = {
5228 5236
      "version": 1,
5229
      "cluster_name": self.cfg.GetClusterName(),
5237
      "cluster_name": cfg.GetClusterName(),
5230 5238
      "cluster_tags": list(cluster_info.GetTags()),
5231 5239
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5232 5240
      # we don't have job IDs
......
5239 5247
    node_list = cfg.GetNodeList()
5240 5248
    # FIXME: here we have only one hypervisor information, but
5241 5249
    # instance can belong to different hypervisors
5242
    node_data = rpc.call_node_info(node_list, cfg.GetVGName(),
5243
                                   cfg.GetHypervisorType())
5250
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5251
                                           cfg.GetHypervisorType())
5244 5252
    for nname in node_list:
5245 5253
      ninfo = cfg.GetNodeInfo(nname)
5246 5254
      if nname not in node_data or not isinstance(node_data[nname], dict):
......
5350 5358
    done.
5351 5359

  
5352 5360
    """
5353
    instance = self.cfg.GetInstanceInfo(self.name)
5361
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5354 5362
    if instance is None:
5355 5363
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5356 5364
                                   " IAllocator" % self.name)
......
5389 5397

  
5390 5398
    self.in_text = serializer.Dump(self.in_data)
5391 5399

  
5392
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5400
  def Run(self, name, validate=True, call_fn=None):
5393 5401
    """Run an instance allocator and return the results.
5394 5402

  
5395 5403
    """
5404
    if call_fn is None:
5405
      call_fn = self.lu.rpc.call_iallocator_runner
5396 5406
    data = self.in_text
5397 5407

  
5398
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5408
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5399 5409

  
5400 5410
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5401 5411
      raise errors.OpExecError("Invalid result from master iallocator runner")
......
5508 5518

  
5509 5519
    """
5510 5520
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5511
      ial = IAllocator(self.cfg,
5521
      ial = IAllocator(self,
5512 5522
                       mode=self.op.mode,
5513 5523
                       name=self.op.name,
5514 5524
                       mem_size=self.op.mem_size,
......
5520 5530
                       vcpus=self.op.vcpus,
5521 5531
                       )
5522 5532
    else:
5523
      ial = IAllocator(self.cfg,
5533
      ial = IAllocator(self,
5524 5534
                       mode=self.op.mode,
5525 5535
                       name=self.op.name,
5526 5536
                       relocate_from=list(self.relocate_from),
b/lib/config.py
821 821
    except ValueError:
822 822
      pass
823 823

  
824
    result = rpc.call_upload_file(nodelist, self._cfg_file)
824
    result = rpc.RpcRunner.call_upload_file(nodelist, self._cfg_file)
825 825
    for node in nodelist:
826 826
      if not result[node]:
827 827
        logging.error("copy of file %s to node %s failed",
b/lib/jqueue.py
45 45
from ganeti import jstore
46 46
from ganeti import rpc
47 47

  
48
from ganeti.rpc import RpcRunner
48 49

  
49 50
JOBQUEUE_THREADS = 25
50 51

  
......
404 405
    assert node_name != self._my_hostname
405 406

  
406 407
    # Clean queue directory on added node
407
    rpc.call_jobqueue_purge(node_name)
408
    RpcRunner.call_jobqueue_purge(node_name)
408 409

  
409 410
    # Upload the whole queue excluding archived jobs
410 411
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
......
420 421
      finally:
421 422
        fd.close()
422 423

  
423
      result = rpc.call_jobqueue_update([node_name], file_name, content)
424
      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
424 425
      if not result[node_name]:
425 426
        logging.error("Failed to upload %s to %s", file_name, node_name)
426 427

  
......
459 460
    """
460 461
    utils.WriteFile(file_name, data=data)
461 462

  
462
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
463
    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
463 464
    self._CheckRpcResult(result, self._nodes,
464 465
                         "Updating %s" % file_name)
465 466

  
466 467
  def _RenameFileUnlocked(self, old, new):
467 468
    os.rename(old, new)
468 469

  
469
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
470
    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
470 471
    self._CheckRpcResult(result, self._nodes,
471 472
                         "Moving %s to %s" % (old, new))
472 473

  
b/lib/mcpu.py
97 97
    self.context = context
98 98
    self._feedback_fn = None
99 99
    self.exclusive_BGL = False
100
    self.rpc = rpc.RpcRunner(context.cfg)
100 101

  
101 102
  def _ExecLU(self, lu):
102 103
    """Logical Unit execution sequence.
......
104 105
    """
105 106
    write_count = self.context.cfg.write_count
106 107
    lu.CheckPrereq()
107
    hm = HooksMaster(rpc.call_hooks_runner, self, lu)
108
    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
108 109
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
109 110
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
110 111
                     self._feedback_fn, None)
......
202 203
                             shared=not lu_class.REQ_BGL)
203 204
    try:
204 205
      self.exclusive_BGL = lu_class.REQ_BGL
205
      lu = lu_class(self, op, self.context)
206
      lu = lu_class(self, op, self.context, self.rpc)
206 207
      lu.ExpandNames()
207 208
      assert lu.needed_locks is not None, "needed_locks not set by LU"
208 209
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
b/lib/rpc.py
23 23

  
24 24
"""
25 25

  
26
# pylint: disable-msg=C0103
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
27 32

  
28 33
import os
29 34
import socket
......
140 145
      self.results[node] = nc.get_response()
141 146

  
142 147

  
143
def call_volume_list(node_list, vg_name):
144
  """Gets the logical volumes present in a given volume group.
148
class RpcRunner(object):
149
  """RPC runner class"""
145 150

  
146
  This is a multi-node call.
151
  def __init__(self, cfg):
152
    """Initialized the rpc runner.
147 153

  
148
  """
149
  c = Client("volume_list", [vg_name])
150
  c.connect_list(node_list)
151
  c.run()
152
  return c.getresult()
154
    @type cfg:  C{config.ConfigWriter}
155
    @param cfg: the configuration object that will be used to get data
156
                about the cluster
153 157

  
158
    """
159
    self._cfg = cfg
154 160

  
155
def call_vg_list(node_list):
156
  """Gets the volume group list.
161
  def call_volume_list(self, node_list, vg_name):
162
    """Gets the logical volumes present in a given volume group.
157 163

  
158
  This is a multi-node call.
164
    This is a multi-node call.
159 165

  
160
  """
161
  c = Client("vg_list", [])
162
  c.connect_list(node_list)
163
  c.run()
164
  return c.getresult()
166
    """
167
    c = Client("volume_list", [vg_name])
168
    c.connect_list(node_list)
169
    c.run()
170
    return c.getresult()
165 171

  
172
  def call_vg_list(self, node_list):
173
    """Gets the volume group list.
166 174

  
167
def call_bridges_exist(node, bridges_list):
168
  """Checks if a node has all the bridges given.
175
    This is a multi-node call.
169 176

  
170
  This method checks if all bridges given in the bridges_list are
171
  present on the remote node, so that an instance that uses interfaces
172
  on those bridges can be started.
177
    """
178
    c = Client("vg_list", [])
179
    c.connect_list(node_list)
180
    c.run()
181
    return c.getresult()
173 182

  
174
  This is a single-node call.
175 183

  
176
  """
177
  c = Client("bridges_exist", [bridges_list])
178
  c.connect(node)
179
  c.run()
180
  return c.getresult().get(node, False)
184
  def call_bridges_exist(self, node, bridges_list):
185
    """Checks if a node has all the bridges given.
181 186

  
187
    This method checks if all bridges given in the bridges_list are
188
    present on the remote node, so that an instance that uses interfaces
189
    on those bridges can be started.
182 190

  
183
def call_instance_start(node, instance, extra_args):
184
  """Starts an instance.
191
    This is a single-node call.
185 192

  
186
  This is a single-node call.
193
    """
194
    c = Client("bridges_exist", [bridges_list])
195
    c.connect(node)
196
    c.run()
197
    return c.getresult().get(node, False)
187 198

  
188
  """
189
  c = Client("instance_start", [instance.ToDict(), extra_args])
190
  c.connect(node)
191
  c.run()
192
  return c.getresult().get(node, False)
193 199

  
200
  def call_instance_start(self, node, instance, extra_args):
201
    """Starts an instance.
194 202

  
195
def call_instance_shutdown(node, instance):
196
  """Stops an instance.
203
    This is a single-node call.
197 204

  
198
  This is a single-node call.
205
    """
206
    c = Client("instance_start", [instance.ToDict(), extra_args])
207
    c.connect(node)
208
    c.run()
209
    return c.getresult().get(node, False)
199 210

  
200
  """
201
  c = Client("instance_shutdown", [instance.ToDict()])
202
  c.connect(node)
203
  c.run()
204
  return c.getresult().get(node, False)
205 211

  
212
  def call_instance_shutdown(self, node, instance):
213
    """Stops an instance.
206 214

  
207
def call_instance_migrate(node, instance, target, live):
208
  """Migrate an instance.
215
    This is a single-node call.
209 216

  
210
  This is a single-node call.
217
    """
218
    c = Client("instance_shutdown", [instance.ToDict()])
219
    c.connect(node)
220
    c.run()
221
    return c.getresult().get(node, False)
211 222

  
212
  @type node: string
213
  @param node: the node on which the instance is currently running
214
  @type instance: C{objects.Instance}
215
  @param instance: the instance definition
216
  @type target: string
217
  @param target: the target node name
218
  @type live: boolean
219
  @param live: whether the migration should be done live or not (the
220
      interpretation of this parameter is left to the hypervisor)
221 223

  
222
  """
223
  c = Client("instance_migrate", [instance.ToDict(), target, live])
224
  c.connect(node)
225
  c.run()
226
  return c.getresult().get(node, False)
224
  def call_instance_migrate(self, node, instance, target, live):
225
    """Migrate an instance.
227 226

  
227
    This is a single-node call.
228 228

  
229
def call_instance_reboot(node, instance, reboot_type, extra_args):
230
  """Reboots an instance.
229
    @type node: string
230
    @param node: the node on which the instance is currently running
231
    @type instance: C{objects.Instance}
232
    @param instance: the instance definition
233
    @type target: string
234
    @param target: the target node name
235
    @type live: boolean
236
    @param live: whether the migration should be done live or not (the
237
        interpretation of this parameter is left to the hypervisor)
231 238

  
232
  This is a single-node call.
239
    """
240
    c = Client("instance_migrate", [instance.ToDict(), target, live])
241
    c.connect(node)
242
    c.run()
243
    return c.getresult().get(node, False)
233 244

  
234
  """
235
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
236
  c.connect(node)
237
  c.run()
238
  return c.getresult().get(node, False)
239 245

  
246
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
247
    """Reboots an instance.
240 248

  
241
def call_instance_os_add(node, inst, osdev, swapdev):
242
  """Installs an OS on the given instance.
249
    This is a single-node call.
243 250

  
244
  This is a single-node call.
251
    """
252
    c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
253
    c.connect(node)
254
    c.run()
255
    return c.getresult().get(node, False)
245 256

  
246
  """
247
  params = [inst.ToDict(), osdev, swapdev]
248
  c = Client("instance_os_add", params)
249
  c.connect(node)
250
  c.run()
251
  return c.getresult().get(node, False)
252 257

  
258
  def call_instance_os_add(self, node, inst, osdev, swapdev):
259
    """Installs an OS on the given instance.
253 260

  
254
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
255
  """Run the OS rename script for an instance.
261
    This is a single-node call.
256 262

  
257
  This is a single-node call.
263
    """
264
    params = [inst.ToDict(), osdev, swapdev]
265
    c = Client("instance_os_add", params)
266
    c.connect(node)
267
    c.run()
268
    return c.getresult().get(node, False)
258 269

  
259
  """
260
  params = [inst.ToDict(), old_name, osdev, swapdev]
261
  c = Client("instance_run_rename", params)
262
  c.connect(node)
263
  c.run()
264
  return c.getresult().get(node, False)
265 270

  
271
  def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
272
    """Run the OS rename script for an instance.
266 273

  
267
def call_instance_info(node, instance, hname):
268
  """Returns information about a single instance.
274
    This is a single-node call.
269 275

  
270
  This is a single-node call.
276
    """
277
    params = [inst.ToDict(), old_name, osdev, swapdev]
278
    c = Client("instance_run_rename", params)
279
    c.connect(node)
280
    c.run()
281
    return c.getresult().get(node, False)
271 282

  
272
  @type node_list: list
273
  @param node_list: the list of nodes to query
274
  @type instance: string
275
  @param instance: the instance name
276
  @type hname: string
277
  @param hname: the hypervisor type of the instance
278 283

  
279
  """
280
  c = Client("instance_info", [instance])
281
  c.connect(node)
282
  c.run()
283
  return c.getresult().get(node, False)
284
  def call_instance_info(self, node, instance, hname):
285
    """Returns information about a single instance.
284 286

  
287
    This is a single-node call.
285 288

  
286
def call_all_instances_info(node_list, hypervisor_list):
287
  """Returns information about all instances on the given nodes.
289
    @type node_list: list
290
    @param node_list: the list of nodes to query
291
    @type instance: string
292
    @param instance: the instance name
293
    @type hname: string
294
    @param hname: the hypervisor type of the instance
288 295

  
289
  This is a multi-node call.
296
    """
297
    c = Client("instance_info", [instance])
298
    c.connect(node)
299
    c.run()
300
    return c.getresult().get(node, False)
290 301

  
291
  @type node_list: list
292
  @param node_list: the list of nodes to query
293
  @type hypervisor_list: list
294
  @param hypervisor_list: the hypervisors to query for instances
295 302

  
296
  """
297
  c = Client("all_instances_info", [hypervisor_list])
298
  c.connect_list(node_list)
299
  c.run()
300
  return c.getresult()
303
  def call_all_instances_info(self, node_list, hypervisor_list):
304
    """Returns information about all instances on the given nodes.
301 305

  
306
    This is a multi-node call.
302 307

  
303
def call_instance_list(node_list, hypervisor_list):
304
  """Returns the list of running instances on a given node.
308
    @type node_list: list
309
    @param node_list: the list of nodes to query
310
    @type hypervisor_list: list
311
    @param hypervisor_list: the hypervisors to query for instances
305 312

  
306
  This is a multi-node call.
313
    """
314
    c = Client("all_instances_info", [hypervisor_list])
315
    c.connect_list(node_list)
316
    c.run()
317
    return c.getresult()
307 318

  
308
  @type node_list: list
309
  @param node_list: the list of nodes to query
310
  @type hypervisor_list: list
311
  @param hypervisor_list: the hypervisors to query for instances
312 319

  
313
  """
314
  c = Client("instance_list", [hypervisor_list])
315
  c.connect_list(node_list)
316
  c.run()
317
  return c.getresult()
320
  def call_instance_list(self, node_list, hypervisor_list):
321
    """Returns the list of running instances on a given node.
318 322

  
323
    This is a multi-node call.
319 324

  
320
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
321
  """Do a TcpPing on the remote node
325
    @type node_list: list
326
    @param node_list: the list of nodes to query
327
    @type hypervisor_list: list
328
    @param hypervisor_list: the hypervisors to query for instances
322 329

  
323
  This is a single-node call.
324
  """
325
  c = Client("node_tcp_ping", [source, target, port, timeout,
326
                               live_port_needed])
327
  c.connect(node)
328
  c.run()
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff