Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 249069a1

History | View | Annotate | Download (145.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84
    if self.REQ_CLUSTER:
85
      if not cfg.IsCluster():
86
        raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                   " use 'gnt-cluster init' first.")
88
      if self.REQ_MASTER:
89
        master = sstore.GetMasterNode()
90
        if master != utils.HostInfo().name:
91
          raise errors.OpPrereqError("Commands must be run on the master"
92
                                     " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    As for the node lists, the master should not be included in the
148
    them, as it will be added by the hooks runner in case this LU
149
    requires a cluster to run on (otherwise we don't have a node
150
    list). No nodes should be returned as an empty list (and not
151
    None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159

    
160
class NoHooksLU(LogicalUnit):
161
  """Simple LU which runs no hooks.
162

163
  This LU is intended as a parent for other LogicalUnits which will
164
  run no hooks, in order to reduce duplicate code.
165

166
  """
167
  HPATH = None
168
  HTYPE = None
169

    
170
  def BuildHooksEnv(self):
171
    """Build hooks env.
172

173
    This is a no-op, since we don't run hooks.
174

175
    """
176
    return {}, [], []
177

    
178

    
179
def _AddHostToEtcHosts(hostname):
180
  """Wrapper around utils.SetEtcHostsEntry.
181

182
  """
183
  hi = utils.HostInfo(name=hostname)
184
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185

    
186

    
187
def _RemoveHostFromEtcHosts(hostname):
188
  """Wrapper around utils.RemoveEtcHostsEntry.
189

190
  """
191
  hi = utils.HostInfo(name=hostname)
192
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194

    
195

    
196
def _GetWantedNodes(lu, nodes):
197
  """Returns list of checked and expanded node names.
198

199
  Args:
200
    nodes: List of nodes (strings) or None for all
201

202
  """
203
  if not isinstance(nodes, list):
204
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
205

    
206
  if nodes:
207
    wanted = []
208

    
209
    for name in nodes:
210
      node = lu.cfg.ExpandNodeName(name)
211
      if node is None:
212
        raise errors.OpPrereqError("No such node name '%s'" % name)
213
      wanted.append(node)
214

    
215
  else:
216
    wanted = lu.cfg.GetNodeList()
217
  return utils.NiceSort(wanted)
218

    
219

    
220
def _GetWantedInstances(lu, instances):
221
  """Returns list of checked and expanded instance names.
222

223
  Args:
224
    instances: List of instances (strings) or None for all
225

226
  """
227
  if not isinstance(instances, list):
228
    raise errors.OpPrereqError("Invalid argument type 'instances'")
229

    
230
  if instances:
231
    wanted = []
232

    
233
    for name in instances:
234
      instance = lu.cfg.ExpandInstanceName(name)
235
      if instance is None:
236
        raise errors.OpPrereqError("No such instance name '%s'" % name)
237
      wanted.append(instance)
238

    
239
  else:
240
    wanted = lu.cfg.GetInstanceList()
241
  return utils.NiceSort(wanted)
242

    
243

    
244
def _CheckOutputFields(static, dynamic, selected):
245
  """Checks whether all selected fields are valid.
246

247
  Args:
248
    static: Static fields
249
    dynamic: Dynamic fields
250

251
  """
252
  static_fields = frozenset(static)
253
  dynamic_fields = frozenset(dynamic)
254

    
255
  all_fields = static_fields | dynamic_fields
256

    
257
  if not all_fields.issuperset(selected):
258
    raise errors.OpPrereqError("Unknown output fields selected: %s"
259
                               % ",".join(frozenset(selected).
260
                                          difference(all_fields)))
261

    
262

    
263
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264
                          memory, vcpus, nics):
265
  """Builds instance related env variables for hooks from single variables.
266

267
  Args:
268
    secondary_nodes: List of secondary nodes as strings
269
  """
270
  env = {
271
    "OP_TARGET": name,
272
    "INSTANCE_NAME": name,
273
    "INSTANCE_PRIMARY": primary_node,
274
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275
    "INSTANCE_OS_TYPE": os_type,
276
    "INSTANCE_STATUS": status,
277
    "INSTANCE_MEMORY": memory,
278
    "INSTANCE_VCPUS": vcpus,
279
  }
280

    
281
  if nics:
282
    nic_count = len(nics)
283
    for idx, (ip, bridge, mac) in enumerate(nics):
284
      if ip is None:
285
        ip = ""
286
      env["INSTANCE_NIC%d_IP" % idx] = ip
287
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289
  else:
290
    nic_count = 0
291

    
292
  env["INSTANCE_NIC_COUNT"] = nic_count
293

    
294
  return env
295

    
296

    
297
def _BuildInstanceHookEnvByObject(instance, override=None):
298
  """Builds instance related env variables for hooks from an object.
299

300
  Args:
301
    instance: objects.Instance object of instance
302
    override: dict of values to override
303
  """
304
  args = {
305
    'name': instance.name,
306
    'primary_node': instance.primary_node,
307
    'secondary_nodes': instance.secondary_nodes,
308
    'os_type': instance.os,
309
    'status': instance.os,
310
    'memory': instance.memory,
311
    'vcpus': instance.vcpus,
312
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313
  }
314
  if override:
315
    args.update(override)
316
  return _BuildInstanceHookEnv(**args)
317

    
318

    
319
def _HasValidVG(vglist, vgname):
320
  """Checks if the volume group list is valid.
321

322
  A non-None return value means there's an error, and the return value
323
  is the error message.
324

325
  """
326
  vgsize = vglist.get(vgname, None)
327
  if vgsize is None:
328
    return "volume group '%s' missing" % vgname
329
  elif vgsize < 20480:
330
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
331
            (vgname, vgsize))
332
  return None
333

    
334

    
335
def _InitSSHSetup(node):
336
  """Setup the SSH configuration for the cluster.
337

338

339
  This generates a dsa keypair for root, adds the pub key to the
340
  permitted hosts and adds the hostkey to its own known hosts.
341

342
  Args:
343
    node: the name of this host as a fqdn
344

345
  """
346
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
347

    
348
  for name in priv_key, pub_key:
349
    if os.path.exists(name):
350
      utils.CreateBackup(name)
351
    utils.RemoveFile(name)
352

    
353
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
354
                         "-f", priv_key,
355
                         "-q", "-N", ""])
356
  if result.failed:
357
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
358
                             result.output)
359

    
360
  f = open(pub_key, 'r')
361
  try:
362
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
363
  finally:
364
    f.close()
365

    
366

    
367
def _InitGanetiServerSetup(ss):
368
  """Setup the necessary configuration for the initial node daemon.
369

370
  This creates the nodepass file containing the shared password for
371
  the cluster and also generates the SSL certificate.
372

373
  """
374
  # Create pseudo random password
375
  randpass = sha.new(os.urandom(64)).hexdigest()
376
  # and write it into sstore
377
  ss.SetKey(ss.SS_NODED_PASS, randpass)
378

    
379
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380
                         "-days", str(365*5), "-nodes", "-x509",
381
                         "-keyout", constants.SSL_CERT_FILE,
382
                         "-out", constants.SSL_CERT_FILE, "-batch"])
383
  if result.failed:
384
    raise errors.OpExecError("could not generate server ssl cert, command"
385
                             " %s had exitcode %s and error message %s" %
386
                             (result.cmd, result.exit_code, result.output))
387

    
388
  os.chmod(constants.SSL_CERT_FILE, 0400)
389

    
390
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
391

    
392
  if result.failed:
393
    raise errors.OpExecError("Could not start the node daemon, command %s"
394
                             " had exitcode %s and error %s" %
395
                             (result.cmd, result.exit_code, result.output))
396

    
397

    
398
def _CheckInstanceBridgesExist(instance):
399
  """Check that the brigdes needed by an instance exist.
400

401
  """
402
  # check bridges existance
403
  brlist = [nic.bridge for nic in instance.nics]
404
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
405
    raise errors.OpPrereqError("one or more target bridges %s does not"
406
                               " exist on destination node '%s'" %
407
                               (brlist, instance.primary_node))
408

    
409

    
410
class LUInitCluster(LogicalUnit):
411
  """Initialise the cluster.
412

413
  """
414
  HPATH = "cluster-init"
415
  HTYPE = constants.HTYPE_CLUSTER
416
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
417
              "def_bridge", "master_netdev", "file_storage_dir"]
418
  REQ_CLUSTER = False
419

    
420
  def BuildHooksEnv(self):
421
    """Build hooks env.
422

423
    Notes: Since we don't require a cluster, we must manually add
424
    ourselves in the post-run node list.
425

426
    """
427
    env = {"OP_TARGET": self.op.cluster_name}
428
    return env, [], [self.hostname.name]
429

    
430
  def CheckPrereq(self):
431
    """Verify that the passed name is a valid one.
432

433
    """
434
    if config.ConfigWriter.IsCluster():
435
      raise errors.OpPrereqError("Cluster is already initialised")
436

    
437
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
439
        raise errors.OpPrereqError("Please prepare the cluster VNC"
440
                                   "password file %s" %
441
                                   constants.VNC_PASSWORD_FILE)
442

    
443
    self.hostname = hostname = utils.HostInfo()
444

    
445
    if hostname.ip.startswith("127."):
446
      raise errors.OpPrereqError("This host's IP resolves to the private"
447
                                 " range (%s). Please fix DNS or %s." %
448
                                 (hostname.ip, constants.ETC_HOSTS))
449

    
450
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451
                         source=constants.LOCALHOST_IP_ADDRESS):
452
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453
                                 " to %s,\nbut this ip address does not"
454
                                 " belong to this host."
455
                                 " Aborting." % hostname.ip)
456

    
457
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
458

    
459
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
460
                     timeout=5):
461
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
462

    
463
    secondary_ip = getattr(self.op, "secondary_ip", None)
464
    if secondary_ip and not utils.IsValidIP(secondary_ip):
465
      raise errors.OpPrereqError("Invalid secondary ip given")
466
    if (secondary_ip and
467
        secondary_ip != hostname.ip and
468
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469
                           source=constants.LOCALHOST_IP_ADDRESS))):
470
      raise errors.OpPrereqError("You gave %s as secondary IP,"
471
                                 " but it does not belong to this host." %
472
                                 secondary_ip)
473
    self.secondary_ip = secondary_ip
474

    
475
    # checks presence of the volume group given
476
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
477

    
478
    if vgstatus:
479
      raise errors.OpPrereqError("Error: %s" % vgstatus)
480

    
481
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
482

    
483
    if not os.path.isabs(self.op.file_storage_dir):
484
      raise errors.OpPrereqError("The file storage directory you have is"
485
                                 " not an absolute path.")
486

    
487
    if not os.path.exists(self.op.file_storage_dir):
488
      try:
489
        os.makedirs(self.op.file_storage_dir, 0750)
490
      except OSError, err:
491
        raise errors.OpPrereqError("Cannot create file storage directory"
492
                                   " '%s': %s" %
493
                                   (self.op.file_storage_dir, err))
494

    
495
    if not os.path.isdir(self.op.file_storage_dir):
496
      raise errors.OpPrereqError("The file storage directory '%s' is not"
497
                                 " a directory." % self.op.file_storage_dir)
498

    
499
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
500
                    self.op.mac_prefix):
501
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
502
                                 self.op.mac_prefix)
503

    
504
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
505
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
506
                                 self.op.hypervisor_type)
507

    
508
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
509
    if result.failed:
510
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
511
                                 (self.op.master_netdev,
512
                                  result.output.strip()))
513

    
514
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
515
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
516
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
517
                                 " executable." % constants.NODE_INITD_SCRIPT)
518

    
519
  def Exec(self, feedback_fn):
520
    """Initialize the cluster.
521

522
    """
523
    clustername = self.clustername
524
    hostname = self.hostname
525

    
526
    # set up the simple store
527
    self.sstore = ss = ssconf.SimpleStore()
528
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
529
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
530
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
531
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
532
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
533
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
534

    
535
    # set up the inter-node password and certificate
536
    _InitGanetiServerSetup(ss)
537

    
538
    # start the master ip
539
    rpc.call_node_start_master(hostname.name)
540

    
541
    # set up ssh config and /etc/hosts
542
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
543
    try:
544
      sshline = f.read()
545
    finally:
546
      f.close()
547
    sshkey = sshline.split(" ")[1]
548

    
549
    _AddHostToEtcHosts(hostname.name)
550
    _InitSSHSetup(hostname.name)
551

    
552
    # init of cluster config file
553
    self.cfg = cfgw = config.ConfigWriter()
554
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
555
                    sshkey, self.op.mac_prefix,
556
                    self.op.vg_name, self.op.def_bridge)
557

    
558
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
559

    
560

    
561
class LUDestroyCluster(NoHooksLU):
562
  """Logical unit for destroying the cluster.
563

564
  """
565
  _OP_REQP = []
566

    
567
  def CheckPrereq(self):
568
    """Check prerequisites.
569

570
    This checks whether the cluster is empty.
571

572
    Any errors are signalled by raising errors.OpPrereqError.
573

574
    """
575
    master = self.sstore.GetMasterNode()
576

    
577
    nodelist = self.cfg.GetNodeList()
578
    if len(nodelist) != 1 or nodelist[0] != master:
579
      raise errors.OpPrereqError("There are still %d node(s) in"
580
                                 " this cluster." % (len(nodelist) - 1))
581
    instancelist = self.cfg.GetInstanceList()
582
    if instancelist:
583
      raise errors.OpPrereqError("There are still %d instance(s) in"
584
                                 " this cluster." % len(instancelist))
585

    
586
  def Exec(self, feedback_fn):
587
    """Destroys the cluster.
588

589
    """
590
    master = self.sstore.GetMasterNode()
591
    if not rpc.call_node_stop_master(master):
592
      raise errors.OpExecError("Could not disable the master role")
593
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594
    utils.CreateBackup(priv_key)
595
    utils.CreateBackup(pub_key)
596
    rpc.call_node_leave_cluster(master)
597

    
598

    
599
class LUVerifyCluster(NoHooksLU):
600
  """Verifies the cluster status.
601

602
  """
603
  _OP_REQP = []
604

    
605
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
606
                  remote_version, feedback_fn):
607
    """Run multiple tests against a node.
608

609
    Test list:
610
      - compares ganeti version
611
      - checks vg existance and size > 20G
612
      - checks config file checksum
613
      - checks ssh to other nodes
614

615
    Args:
616
      node: name of the node to check
617
      file_list: required list of files
618
      local_cksum: dictionary of local files and their checksums
619

620
    """
621
    # compares ganeti version
622
    local_version = constants.PROTOCOL_VERSION
623
    if not remote_version:
624
      feedback_fn(" - ERROR: connection to %s failed" % (node))
625
      return True
626

    
627
    if local_version != remote_version:
628
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
629
                      (local_version, node, remote_version))
630
      return True
631

    
632
    # checks vg existance and size > 20G
633

    
634
    bad = False
635
    if not vglist:
636
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
637
                      (node,))
638
      bad = True
639
    else:
640
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
641
      if vgstatus:
642
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
643
        bad = True
644

    
645
    # checks config file checksum
646
    # checks ssh to any
647

    
648
    if 'filelist' not in node_result:
649
      bad = True
650
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
651
    else:
652
      remote_cksum = node_result['filelist']
653
      for file_name in file_list:
654
        if file_name not in remote_cksum:
655
          bad = True
656
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
657
        elif remote_cksum[file_name] != local_cksum[file_name]:
658
          bad = True
659
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
660

    
661
    if 'nodelist' not in node_result:
662
      bad = True
663
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
664
    else:
665
      if node_result['nodelist']:
666
        bad = True
667
        for node in node_result['nodelist']:
668
          feedback_fn("  - ERROR: communication with node '%s': %s" %
669
                          (node, node_result['nodelist'][node]))
670
    hyp_result = node_result.get('hypervisor', None)
671
    if hyp_result is not None:
672
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
673
    return bad
674

    
675
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
676
    """Verify an instance.
677

678
    This function checks to see if the required block devices are
679
    available on the instance's node.
680

681
    """
682
    bad = False
683

    
684
    instancelist = self.cfg.GetInstanceList()
685
    if not instance in instancelist:
686
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
687
                      (instance, instancelist))
688
      bad = True
689

    
690
    instanceconfig = self.cfg.GetInstanceInfo(instance)
691
    node_current = instanceconfig.primary_node
692

    
693
    node_vol_should = {}
694
    instanceconfig.MapLVsByNode(node_vol_should)
695

    
696
    for node in node_vol_should:
697
      for volume in node_vol_should[node]:
698
        if node not in node_vol_is or volume not in node_vol_is[node]:
699
          feedback_fn("  - ERROR: volume %s missing on node %s" %
700
                          (volume, node))
701
          bad = True
702

    
703
    if not instanceconfig.status == 'down':
704
      if not instance in node_instance[node_current]:
705
        feedback_fn("  - ERROR: instance %s not running on node %s" %
706
                        (instance, node_current))
707
        bad = True
708

    
709
    for node in node_instance:
710
      if (not node == node_current):
711
        if instance in node_instance[node]:
712
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
713
                          (instance, node))
714
          bad = True
715

    
716
    return bad
717

    
718
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
719
    """Verify if there are any unknown volumes in the cluster.
720

721
    The .os, .swap and backup volumes are ignored. All other volumes are
722
    reported as unknown.
723

724
    """
725
    bad = False
726

    
727
    for node in node_vol_is:
728
      for volume in node_vol_is[node]:
729
        if node not in node_vol_should or volume not in node_vol_should[node]:
730
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
731
                      (volume, node))
732
          bad = True
733
    return bad
734

    
735
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
736
    """Verify the list of running instances.
737

738
    This checks what instances are running but unknown to the cluster.
739

740
    """
741
    bad = False
742
    for node in node_instance:
743
      for runninginstance in node_instance[node]:
744
        if runninginstance not in instancelist:
745
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
746
                          (runninginstance, node))
747
          bad = True
748
    return bad
749

    
750
  def CheckPrereq(self):
751
    """Check prerequisites.
752

753
    This has no prerequisites.
754

755
    """
756
    pass
757

    
758
  def Exec(self, feedback_fn):
759
    """Verify integrity of cluster, performing various test on nodes.
760

761
    """
762
    bad = False
763
    feedback_fn("* Verifying global settings")
764
    for msg in self.cfg.VerifyConfig():
765
      feedback_fn("  - ERROR: %s" % msg)
766

    
767
    vg_name = self.cfg.GetVGName()
768
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
769
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
770
    node_volume = {}
771
    node_instance = {}
772

    
773
    # FIXME: verify OS list
774
    # do local checksums
775
    file_names = list(self.sstore.GetFileList())
776
    file_names.append(constants.SSL_CERT_FILE)
777
    file_names.append(constants.CLUSTER_CONF_FILE)
778
    local_checksums = utils.FingerprintFiles(file_names)
779

    
780
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
781
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
782
    all_instanceinfo = rpc.call_instance_list(nodelist)
783
    all_vglist = rpc.call_vg_list(nodelist)
784
    node_verify_param = {
785
      'filelist': file_names,
786
      'nodelist': nodelist,
787
      'hypervisor': None,
788
      }
789
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
790
    all_rversion = rpc.call_version(nodelist)
791

    
792
    for node in nodelist:
793
      feedback_fn("* Verifying node %s" % node)
794
      result = self._VerifyNode(node, file_names, local_checksums,
795
                                all_vglist[node], all_nvinfo[node],
796
                                all_rversion[node], feedback_fn)
797
      bad = bad or result
798

    
799
      # node_volume
800
      volumeinfo = all_volumeinfo[node]
801

    
802
      if isinstance(volumeinfo, basestring):
803
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
804
                    (node, volumeinfo[-400:].encode('string_escape')))
805
        bad = True
806
        node_volume[node] = {}
807
      elif not isinstance(volumeinfo, dict):
808
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
809
        bad = True
810
        continue
811
      else:
812
        node_volume[node] = volumeinfo
813

    
814
      # node_instance
815
      nodeinstance = all_instanceinfo[node]
816
      if type(nodeinstance) != list:
817
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
818
        bad = True
819
        continue
820

    
821
      node_instance[node] = nodeinstance
822

    
823
    node_vol_should = {}
824

    
825
    for instance in instancelist:
826
      feedback_fn("* Verifying instance %s" % instance)
827
      result =  self._VerifyInstance(instance, node_volume, node_instance,
828
                                     feedback_fn)
829
      bad = bad or result
830

    
831
      inst_config = self.cfg.GetInstanceInfo(instance)
832

    
833
      inst_config.MapLVsByNode(node_vol_should)
834

    
835
    feedback_fn("* Verifying orphan volumes")
836
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
837
                                       feedback_fn)
838
    bad = bad or result
839

    
840
    feedback_fn("* Verifying remaining instances")
841
    result = self._VerifyOrphanInstances(instancelist, node_instance,
842
                                         feedback_fn)
843
    bad = bad or result
844

    
845
    return int(bad)
846

    
847

    
848
class LUVerifyDisks(NoHooksLU):
849
  """Verifies the cluster disks status.
850

851
  """
852
  _OP_REQP = []
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This has no prerequisites.
858

859
    """
860
    pass
861

    
862
  def Exec(self, feedback_fn):
863
    """Verify integrity of cluster disks.
864

865
    """
866
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
867

    
868
    vg_name = self.cfg.GetVGName()
869
    nodes = utils.NiceSort(self.cfg.GetNodeList())
870
    instances = [self.cfg.GetInstanceInfo(name)
871
                 for name in self.cfg.GetInstanceList()]
872

    
873
    nv_dict = {}
874
    for inst in instances:
875
      inst_lvs = {}
876
      if (inst.status != "up" or
877
          inst.disk_template not in constants.DTS_NET_MIRROR):
878
        continue
879
      inst.MapLVsByNode(inst_lvs)
880
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
881
      for node, vol_list in inst_lvs.iteritems():
882
        for vol in vol_list:
883
          nv_dict[(node, vol)] = inst
884

    
885
    if not nv_dict:
886
      return result
887

    
888
    node_lvs = rpc.call_volume_list(nodes, vg_name)
889

    
890
    to_act = set()
891
    for node in nodes:
892
      # node_volume
893
      lvs = node_lvs[node]
894

    
895
      if isinstance(lvs, basestring):
896
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
897
        res_nlvm[node] = lvs
898
      elif not isinstance(lvs, dict):
899
        logger.Info("connection to node %s failed or invalid data returned" %
900
                    (node,))
901
        res_nodes.append(node)
902
        continue
903

    
904
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
905
        inst = nv_dict.pop((node, lv_name), None)
906
        if (not lv_online and inst is not None
907
            and inst.name not in res_instances):
908
          res_instances.append(inst.name)
909

    
910
    # any leftover items in nv_dict are missing LVs, let's arrange the
911
    # data better
912
    for key, inst in nv_dict.iteritems():
913
      if inst.name not in res_missing:
914
        res_missing[inst.name] = []
915
      res_missing[inst.name].append(key)
916

    
917
    return result
918

    
919

    
920
class LURenameCluster(LogicalUnit):
921
  """Rename the cluster.
922

923
  """
924
  HPATH = "cluster-rename"
925
  HTYPE = constants.HTYPE_CLUSTER
926
  _OP_REQP = ["name"]
927

    
928
  def BuildHooksEnv(self):
929
    """Build hooks env.
930

931
    """
932
    env = {
933
      "OP_TARGET": self.sstore.GetClusterName(),
934
      "NEW_NAME": self.op.name,
935
      }
936
    mn = self.sstore.GetMasterNode()
937
    return env, [mn], [mn]
938

    
939
  def CheckPrereq(self):
940
    """Verify that the passed name is a valid one.
941

942
    """
943
    hostname = utils.HostInfo(self.op.name)
944

    
945
    new_name = hostname.name
946
    self.ip = new_ip = hostname.ip
947
    old_name = self.sstore.GetClusterName()
948
    old_ip = self.sstore.GetMasterIP()
949
    if new_name == old_name and new_ip == old_ip:
950
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
951
                                 " cluster has changed")
952
    if new_ip != old_ip:
953
      result = utils.RunCmd(["fping", "-q", new_ip])
954
      if not result.failed:
955
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
956
                                   " reachable on the network. Aborting." %
957
                                   new_ip)
958

    
959
    self.op.name = new_name
960

    
961
  def Exec(self, feedback_fn):
962
    """Rename the cluster.
963

964
    """
965
    clustername = self.op.name
966
    ip = self.ip
967
    ss = self.sstore
968

    
969
    # shutdown the master IP
970
    master = ss.GetMasterNode()
971
    if not rpc.call_node_stop_master(master):
972
      raise errors.OpExecError("Could not disable the master role")
973

    
974
    try:
975
      # modify the sstore
976
      ss.SetKey(ss.SS_MASTER_IP, ip)
977
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
978

    
979
      # Distribute updated ss config to all nodes
980
      myself = self.cfg.GetNodeInfo(master)
981
      dist_nodes = self.cfg.GetNodeList()
982
      if myself.name in dist_nodes:
983
        dist_nodes.remove(myself.name)
984

    
985
      logger.Debug("Copying updated ssconf data to all nodes")
986
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
987
        fname = ss.KeyToFilename(keyname)
988
        result = rpc.call_upload_file(dist_nodes, fname)
989
        for to_node in dist_nodes:
990
          if not result[to_node]:
991
            logger.Error("copy of file %s to node %s failed" %
992
                         (fname, to_node))
993
    finally:
994
      if not rpc.call_node_start_master(master):
995
        logger.Error("Could not re-enable the master role on the master,"
996
                     " please restart manually.")
997

    
998

    
999
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1000
  """Sleep and poll for an instance's disk to sync.
1001

1002
  """
1003
  if not instance.disks:
1004
    return True
1005

    
1006
  if not oneshot:
1007
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1008

    
1009
  node = instance.primary_node
1010

    
1011
  for dev in instance.disks:
1012
    cfgw.SetDiskID(dev, node)
1013

    
1014
  retries = 0
1015
  while True:
1016
    max_time = 0
1017
    done = True
1018
    cumul_degraded = False
1019
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1020
    if not rstats:
1021
      proc.LogWarning("Can't get any data from node %s" % node)
1022
      retries += 1
1023
      if retries >= 10:
1024
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1025
                                 " aborting." % node)
1026
      time.sleep(6)
1027
      continue
1028
    retries = 0
1029
    for i in range(len(rstats)):
1030
      mstat = rstats[i]
1031
      if mstat is None:
1032
        proc.LogWarning("Can't compute data for node %s/%s" %
1033
                        (node, instance.disks[i].iv_name))
1034
        continue
1035
      # we ignore the ldisk parameter
1036
      perc_done, est_time, is_degraded, _ = mstat
1037
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1038
      if perc_done is not None:
1039
        done = False
1040
        if est_time is not None:
1041
          rem_time = "%d estimated seconds remaining" % est_time
1042
          max_time = est_time
1043
        else:
1044
          rem_time = "no time estimate"
1045
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1046
                     (instance.disks[i].iv_name, perc_done, rem_time))
1047
    if done or oneshot:
1048
      break
1049

    
1050
    if unlock:
1051
      utils.Unlock('cmd')
1052
    try:
1053
      time.sleep(min(60, max_time))
1054
    finally:
1055
      if unlock:
1056
        utils.Lock('cmd')
1057

    
1058
  if done:
1059
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1060
  return not cumul_degraded
1061

    
1062

    
1063
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1064
  """Check that mirrors are not degraded.
1065

1066
  The ldisk parameter, if True, will change the test from the
1067
  is_degraded attribute (which represents overall non-ok status for
1068
  the device(s)) to the ldisk (representing the local storage status).
1069

1070
  """
1071
  cfgw.SetDiskID(dev, node)
1072
  if ldisk:
1073
    idx = 6
1074
  else:
1075
    idx = 5
1076

    
1077
  result = True
1078
  if on_primary or dev.AssembleOnSecondary():
1079
    rstats = rpc.call_blockdev_find(node, dev)
1080
    if not rstats:
1081
      logger.ToStderr("Can't get any data from node %s" % node)
1082
      result = False
1083
    else:
1084
      result = result and (not rstats[idx])
1085
  if dev.children:
1086
    for child in dev.children:
1087
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1088

    
1089
  return result
1090

    
1091

    
1092
class LUDiagnoseOS(NoHooksLU):
1093
  """Logical unit for OS diagnose/query.
1094

1095
  """
1096
  _OP_REQP = []
1097

    
1098
  def CheckPrereq(self):
1099
    """Check prerequisites.
1100

1101
    This always succeeds, since this is a pure query LU.
1102

1103
    """
1104
    return
1105

    
1106
  def Exec(self, feedback_fn):
1107
    """Compute the list of OSes.
1108

1109
    """
1110
    node_list = self.cfg.GetNodeList()
1111
    node_data = rpc.call_os_diagnose(node_list)
1112
    if node_data == False:
1113
      raise errors.OpExecError("Can't gather the list of OSes")
1114
    return node_data
1115

    
1116

    
1117
class LURemoveNode(LogicalUnit):
1118
  """Logical unit for removing a node.
1119

1120
  """
1121
  HPATH = "node-remove"
1122
  HTYPE = constants.HTYPE_NODE
1123
  _OP_REQP = ["node_name"]
1124

    
1125
  def BuildHooksEnv(self):
1126
    """Build hooks env.
1127

1128
    This doesn't run on the target node in the pre phase as a failed
1129
    node would not allows itself to run.
1130

1131
    """
1132
    env = {
1133
      "OP_TARGET": self.op.node_name,
1134
      "NODE_NAME": self.op.node_name,
1135
      }
1136
    all_nodes = self.cfg.GetNodeList()
1137
    all_nodes.remove(self.op.node_name)
1138
    return env, all_nodes, all_nodes
1139

    
1140
  def CheckPrereq(self):
1141
    """Check prerequisites.
1142

1143
    This checks:
1144
     - the node exists in the configuration
1145
     - it does not have primary or secondary instances
1146
     - it's not the master
1147

1148
    Any errors are signalled by raising errors.OpPrereqError.
1149

1150
    """
1151
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1152
    if node is None:
1153
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1154

    
1155
    instance_list = self.cfg.GetInstanceList()
1156

    
1157
    masternode = self.sstore.GetMasterNode()
1158
    if node.name == masternode:
1159
      raise errors.OpPrereqError("Node is the master node,"
1160
                                 " you need to failover first.")
1161

    
1162
    for instance_name in instance_list:
1163
      instance = self.cfg.GetInstanceInfo(instance_name)
1164
      if node.name == instance.primary_node:
1165
        raise errors.OpPrereqError("Instance %s still running on the node,"
1166
                                   " please remove first." % instance_name)
1167
      if node.name in instance.secondary_nodes:
1168
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1169
                                   " please remove first." % instance_name)
1170
    self.op.node_name = node.name
1171
    self.node = node
1172

    
1173
  def Exec(self, feedback_fn):
1174
    """Removes the node from the cluster.
1175

1176
    """
1177
    node = self.node
1178
    logger.Info("stopping the node daemon and removing configs from node %s" %
1179
                node.name)
1180

    
1181
    rpc.call_node_leave_cluster(node.name)
1182

    
1183
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1184

    
1185
    logger.Info("Removing node %s from config" % node.name)
1186

    
1187
    self.cfg.RemoveNode(node.name)
1188

    
1189
    _RemoveHostFromEtcHosts(node.name)
1190

    
1191

    
1192
class LUQueryNodes(NoHooksLU):
1193
  """Logical unit for querying nodes.
1194

1195
  """
1196
  _OP_REQP = ["output_fields", "names"]
1197

    
1198
  def CheckPrereq(self):
1199
    """Check prerequisites.
1200

1201
    This checks that the fields required are valid output fields.
1202

1203
    """
1204
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1205
                                     "mtotal", "mnode", "mfree",
1206
                                     "bootid"])
1207

    
1208
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1209
                               "pinst_list", "sinst_list",
1210
                               "pip", "sip"],
1211
                       dynamic=self.dynamic_fields,
1212
                       selected=self.op.output_fields)
1213

    
1214
    self.wanted = _GetWantedNodes(self, self.op.names)
1215

    
1216
  def Exec(self, feedback_fn):
1217
    """Computes the list of nodes and their attributes.
1218

1219
    """
1220
    nodenames = self.wanted
1221
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1222

    
1223
    # begin data gathering
1224

    
1225
    if self.dynamic_fields.intersection(self.op.output_fields):
1226
      live_data = {}
1227
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1228
      for name in nodenames:
1229
        nodeinfo = node_data.get(name, None)
1230
        if nodeinfo:
1231
          live_data[name] = {
1232
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1233
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1234
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1235
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1236
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1237
            "bootid": nodeinfo['bootid'],
1238
            }
1239
        else:
1240
          live_data[name] = {}
1241
    else:
1242
      live_data = dict.fromkeys(nodenames, {})
1243

    
1244
    node_to_primary = dict([(name, set()) for name in nodenames])
1245
    node_to_secondary = dict([(name, set()) for name in nodenames])
1246

    
1247
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1248
                             "sinst_cnt", "sinst_list"))
1249
    if inst_fields & frozenset(self.op.output_fields):
1250
      instancelist = self.cfg.GetInstanceList()
1251

    
1252
      for instance_name in instancelist:
1253
        inst = self.cfg.GetInstanceInfo(instance_name)
1254
        if inst.primary_node in node_to_primary:
1255
          node_to_primary[inst.primary_node].add(inst.name)
1256
        for secnode in inst.secondary_nodes:
1257
          if secnode in node_to_secondary:
1258
            node_to_secondary[secnode].add(inst.name)
1259

    
1260
    # end data gathering
1261

    
1262
    output = []
1263
    for node in nodelist:
1264
      node_output = []
1265
      for field in self.op.output_fields:
1266
        if field == "name":
1267
          val = node.name
1268
        elif field == "pinst_list":
1269
          val = list(node_to_primary[node.name])
1270
        elif field == "sinst_list":
1271
          val = list(node_to_secondary[node.name])
1272
        elif field == "pinst_cnt":
1273
          val = len(node_to_primary[node.name])
1274
        elif field == "sinst_cnt":
1275
          val = len(node_to_secondary[node.name])
1276
        elif field == "pip":
1277
          val = node.primary_ip
1278
        elif field == "sip":
1279
          val = node.secondary_ip
1280
        elif field in self.dynamic_fields:
1281
          val = live_data[node.name].get(field, None)
1282
        else:
1283
          raise errors.ParameterError(field)
1284
        node_output.append(val)
1285
      output.append(node_output)
1286

    
1287
    return output
1288

    
1289

    
1290
class LUQueryNodeVolumes(NoHooksLU):
1291
  """Logical unit for getting volumes on node(s).
1292

1293
  """
1294
  _OP_REQP = ["nodes", "output_fields"]
1295

    
1296
  def CheckPrereq(self):
1297
    """Check prerequisites.
1298

1299
    This checks that the fields required are valid output fields.
1300

1301
    """
1302
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1303

    
1304
    _CheckOutputFields(static=["node"],
1305
                       dynamic=["phys", "vg", "name", "size", "instance"],
1306
                       selected=self.op.output_fields)
1307

    
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Computes the list of nodes and their attributes.
1311

1312
    """
1313
    nodenames = self.nodes
1314
    volumes = rpc.call_node_volumes(nodenames)
1315

    
1316
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1317
             in self.cfg.GetInstanceList()]
1318

    
1319
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1320

    
1321
    output = []
1322
    for node in nodenames:
1323
      if node not in volumes or not volumes[node]:
1324
        continue
1325

    
1326
      node_vols = volumes[node][:]
1327
      node_vols.sort(key=lambda vol: vol['dev'])
1328

    
1329
      for vol in node_vols:
1330
        node_output = []
1331
        for field in self.op.output_fields:
1332
          if field == "node":
1333
            val = node
1334
          elif field == "phys":
1335
            val = vol['dev']
1336
          elif field == "vg":
1337
            val = vol['vg']
1338
          elif field == "name":
1339
            val = vol['name']
1340
          elif field == "size":
1341
            val = int(float(vol['size']))
1342
          elif field == "instance":
1343
            for inst in ilist:
1344
              if node not in lv_by_node[inst]:
1345
                continue
1346
              if vol['name'] in lv_by_node[inst][node]:
1347
                val = inst.name
1348
                break
1349
            else:
1350
              val = '-'
1351
          else:
1352
            raise errors.ParameterError(field)
1353
          node_output.append(str(val))
1354

    
1355
        output.append(node_output)
1356

    
1357
    return output
1358

    
1359

    
1360
class LUAddNode(LogicalUnit):
1361
  """Logical unit for adding node to the cluster.
1362

1363
  """
1364
  HPATH = "node-add"
1365
  HTYPE = constants.HTYPE_NODE
1366
  _OP_REQP = ["node_name"]
1367

    
1368
  def BuildHooksEnv(self):
1369
    """Build hooks env.
1370

1371
    This will run on all nodes before, and on all nodes + the new node after.
1372

1373
    """
1374
    env = {
1375
      "OP_TARGET": self.op.node_name,
1376
      "NODE_NAME": self.op.node_name,
1377
      "NODE_PIP": self.op.primary_ip,
1378
      "NODE_SIP": self.op.secondary_ip,
1379
      }
1380
    nodes_0 = self.cfg.GetNodeList()
1381
    nodes_1 = nodes_0 + [self.op.node_name, ]
1382
    return env, nodes_0, nodes_1
1383

    
1384
  def CheckPrereq(self):
1385
    """Check prerequisites.
1386

1387
    This checks:
1388
     - the new node is not already in the config
1389
     - it is resolvable
1390
     - its parameters (single/dual homed) matches the cluster
1391

1392
    Any errors are signalled by raising errors.OpPrereqError.
1393

1394
    """
1395
    node_name = self.op.node_name
1396
    cfg = self.cfg
1397

    
1398
    dns_data = utils.HostInfo(node_name)
1399

    
1400
    node = dns_data.name
1401
    primary_ip = self.op.primary_ip = dns_data.ip
1402
    secondary_ip = getattr(self.op, "secondary_ip", None)
1403
    if secondary_ip is None:
1404
      secondary_ip = primary_ip
1405
    if not utils.IsValidIP(secondary_ip):
1406
      raise errors.OpPrereqError("Invalid secondary IP given")
1407
    self.op.secondary_ip = secondary_ip
1408
    node_list = cfg.GetNodeList()
1409
    if node in node_list:
1410
      raise errors.OpPrereqError("Node %s is already in the configuration"
1411
                                 % node)
1412

    
1413
    for existing_node_name in node_list:
1414
      existing_node = cfg.GetNodeInfo(existing_node_name)
1415
      if (existing_node.primary_ip == primary_ip or
1416
          existing_node.secondary_ip == primary_ip or
1417
          existing_node.primary_ip == secondary_ip or
1418
          existing_node.secondary_ip == secondary_ip):
1419
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1420
                                   " existing node %s" % existing_node.name)
1421

    
1422
    # check that the type of the node (single versus dual homed) is the
1423
    # same as for the master
1424
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1425
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1426
    newbie_singlehomed = secondary_ip == primary_ip
1427
    if master_singlehomed != newbie_singlehomed:
1428
      if master_singlehomed:
1429
        raise errors.OpPrereqError("The master has no private ip but the"
1430
                                   " new node has one")
1431
      else:
1432
        raise errors.OpPrereqError("The master has a private ip but the"
1433
                                   " new node doesn't have one")
1434

    
1435
    # checks reachablity
1436
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1437
      raise errors.OpPrereqError("Node not reachable by ping")
1438

    
1439
    if not newbie_singlehomed:
1440
      # check reachability from my secondary ip to newbie's secondary ip
1441
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1442
                           source=myself.secondary_ip):
1443
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1444
                                   " based ping to noded port")
1445

    
1446
    self.new_node = objects.Node(name=node,
1447
                                 primary_ip=primary_ip,
1448
                                 secondary_ip=secondary_ip)
1449

    
1450
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1451
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1452
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1453
                                   constants.VNC_PASSWORD_FILE)
1454

    
1455
  def Exec(self, feedback_fn):
1456
    """Adds the new node to the cluster.
1457

1458
    """
1459
    new_node = self.new_node
1460
    node = new_node.name
1461

    
1462
    # set up inter-node password and certificate and restarts the node daemon
1463
    gntpass = self.sstore.GetNodeDaemonPassword()
1464
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1465
      raise errors.OpExecError("ganeti password corruption detected")
1466
    f = open(constants.SSL_CERT_FILE)
1467
    try:
1468
      gntpem = f.read(8192)
1469
    finally:
1470
      f.close()
1471
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1472
    # so we use this to detect an invalid certificate; as long as the
1473
    # cert doesn't contain this, the here-document will be correctly
1474
    # parsed by the shell sequence below
1475
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1476
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1477
    if not gntpem.endswith("\n"):
1478
      raise errors.OpExecError("PEM must end with newline")
1479
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1480

    
1481
    # and then connect with ssh to set password and start ganeti-noded
1482
    # note that all the below variables are sanitized at this point,
1483
    # either by being constants or by the checks above
1484
    ss = self.sstore
1485
    mycommand = ("umask 077 && "
1486
                 "echo '%s' > '%s' && "
1487
                 "cat > '%s' << '!EOF.' && \n"
1488
                 "%s!EOF.\n%s restart" %
1489
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1490
                  constants.SSL_CERT_FILE, gntpem,
1491
                  constants.NODE_INITD_SCRIPT))
1492

    
1493
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1494
    if result.failed:
1495
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1496
                               " output: %s" %
1497
                               (node, result.fail_reason, result.output))
1498

    
1499
    # check connectivity
1500
    time.sleep(4)
1501

    
1502
    result = rpc.call_version([node])[node]
1503
    if result:
1504
      if constants.PROTOCOL_VERSION == result:
1505
        logger.Info("communication to node %s fine, sw version %s match" %
1506
                    (node, result))
1507
      else:
1508
        raise errors.OpExecError("Version mismatch master version %s,"
1509
                                 " node version %s" %
1510
                                 (constants.PROTOCOL_VERSION, result))
1511
    else:
1512
      raise errors.OpExecError("Cannot get version from the new node")
1513

    
1514
    # setup ssh on node
1515
    logger.Info("copy ssh key to node %s" % node)
1516
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1517
    keyarray = []
1518
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1519
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1520
                priv_key, pub_key]
1521

    
1522
    for i in keyfiles:
1523
      f = open(i, 'r')
1524
      try:
1525
        keyarray.append(f.read())
1526
      finally:
1527
        f.close()
1528

    
1529
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1530
                               keyarray[3], keyarray[4], keyarray[5])
1531

    
1532
    if not result:
1533
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1534

    
1535
    # Add node to our /etc/hosts, and add key to known_hosts
1536
    _AddHostToEtcHosts(new_node.name)
1537

    
1538
    if new_node.secondary_ip != new_node.primary_ip:
1539
      if not rpc.call_node_tcp_ping(new_node.name,
1540
                                    constants.LOCALHOST_IP_ADDRESS,
1541
                                    new_node.secondary_ip,
1542
                                    constants.DEFAULT_NODED_PORT,
1543
                                    10, False):
1544
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1545
                                 " you gave (%s). Please fix and re-run this"
1546
                                 " command." % new_node.secondary_ip)
1547

    
1548
    success, msg = self.ssh.VerifyNodeHostname(node)
1549
    if not success:
1550
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1551
                               " than the one the resolver gives: %s."
1552
                               " Please fix and re-run this command." %
1553
                               (node, msg))
1554

    
1555
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1556
    # including the node just added
1557
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1558
    dist_nodes = self.cfg.GetNodeList() + [node]
1559
    if myself.name in dist_nodes:
1560
      dist_nodes.remove(myself.name)
1561

    
1562
    logger.Debug("Copying hosts and known_hosts to all nodes")
1563
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1564
      result = rpc.call_upload_file(dist_nodes, fname)
1565
      for to_node in dist_nodes:
1566
        if not result[to_node]:
1567
          logger.Error("copy of file %s to node %s failed" %
1568
                       (fname, to_node))
1569

    
1570
    to_copy = ss.GetFileList()
1571
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1572
      to_copy.append(constants.VNC_PASSWORD_FILE)
1573
    for fname in to_copy:
1574
      if not self.ssh.CopyFileToNode(node, fname):
1575
        logger.Error("could not copy file %s to node %s" % (fname, node))
1576

    
1577
    logger.Info("adding node %s to cluster.conf" % node)
1578
    self.cfg.AddNode(new_node)
1579

    
1580

    
1581
class LUMasterFailover(LogicalUnit):
1582
  """Failover the master node to the current node.
1583

1584
  This is a special LU in that it must run on a non-master node.
1585

1586
  """
1587
  HPATH = "master-failover"
1588
  HTYPE = constants.HTYPE_CLUSTER
1589
  REQ_MASTER = False
1590
  _OP_REQP = []
1591

    
1592
  def BuildHooksEnv(self):
1593
    """Build hooks env.
1594

1595
    This will run on the new master only in the pre phase, and on all
1596
    the nodes in the post phase.
1597

1598
    """
1599
    env = {
1600
      "OP_TARGET": self.new_master,
1601
      "NEW_MASTER": self.new_master,
1602
      "OLD_MASTER": self.old_master,
1603
      }
1604
    return env, [self.new_master], self.cfg.GetNodeList()
1605

    
1606
  def CheckPrereq(self):
1607
    """Check prerequisites.
1608

1609
    This checks that we are not already the master.
1610

1611
    """
1612
    self.new_master = utils.HostInfo().name
1613
    self.old_master = self.sstore.GetMasterNode()
1614

    
1615
    if self.old_master == self.new_master:
1616
      raise errors.OpPrereqError("This commands must be run on the node"
1617
                                 " where you want the new master to be."
1618
                                 " %s is already the master" %
1619
                                 self.old_master)
1620

    
1621
  def Exec(self, feedback_fn):
1622
    """Failover the master node.
1623

1624
    This command, when run on a non-master node, will cause the current
1625
    master to cease being master, and the non-master to become new
1626
    master.
1627

1628
    """
1629
    #TODO: do not rely on gethostname returning the FQDN
1630
    logger.Info("setting master to %s, old master: %s" %
1631
                (self.new_master, self.old_master))
1632

    
1633
    if not rpc.call_node_stop_master(self.old_master):
1634
      logger.Error("could disable the master role on the old master"
1635
                   " %s, please disable manually" % self.old_master)
1636

    
1637
    ss = self.sstore
1638
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1639
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1640
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1641
      logger.Error("could not distribute the new simple store master file"
1642
                   " to the other nodes, please check.")
1643

    
1644
    if not rpc.call_node_start_master(self.new_master):
1645
      logger.Error("could not start the master role on the new master"
1646
                   " %s, please check" % self.new_master)
1647
      feedback_fn("Error in activating the master IP on the new master,"
1648
                  " please fix manually.")
1649

    
1650

    
1651

    
1652
class LUQueryClusterInfo(NoHooksLU):
1653
  """Query cluster configuration.
1654

1655
  """
1656
  _OP_REQP = []
1657
  REQ_MASTER = False
1658

    
1659
  def CheckPrereq(self):
1660
    """No prerequsites needed for this LU.
1661

1662
    """
1663
    pass
1664

    
1665
  def Exec(self, feedback_fn):
1666
    """Return cluster config.
1667

1668
    """
1669
    result = {
1670
      "name": self.sstore.GetClusterName(),
1671
      "software_version": constants.RELEASE_VERSION,
1672
      "protocol_version": constants.PROTOCOL_VERSION,
1673
      "config_version": constants.CONFIG_VERSION,
1674
      "os_api_version": constants.OS_API_VERSION,
1675
      "export_version": constants.EXPORT_VERSION,
1676
      "master": self.sstore.GetMasterNode(),
1677
      "architecture": (platform.architecture()[0], platform.machine()),
1678
      }
1679

    
1680
    return result
1681

    
1682

    
1683
class LUClusterCopyFile(NoHooksLU):
1684
  """Copy file to cluster.
1685

1686
  """
1687
  _OP_REQP = ["nodes", "filename"]
1688

    
1689
  def CheckPrereq(self):
1690
    """Check prerequisites.
1691

1692
    It should check that the named file exists and that the given list
1693
    of nodes is valid.
1694

1695
    """
1696
    if not os.path.exists(self.op.filename):
1697
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1698

    
1699
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Copy a file from master to some nodes.
1703

1704
    Args:
1705
      opts - class with options as members
1706
      args - list containing a single element, the file name
1707
    Opts used:
1708
      nodes - list containing the name of target nodes; if empty, all nodes
1709

1710
    """
1711
    filename = self.op.filename
1712

    
1713
    myname = utils.HostInfo().name
1714

    
1715
    for node in self.nodes:
1716
      if node == myname:
1717
        continue
1718
      if not self.ssh.CopyFileToNode(node, filename):
1719
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1720

    
1721

    
1722
class LUDumpClusterConfig(NoHooksLU):
1723
  """Return a text-representation of the cluster-config.
1724

1725
  """
1726
  _OP_REQP = []
1727

    
1728
  def CheckPrereq(self):
1729
    """No prerequisites.
1730

1731
    """
1732
    pass
1733

    
1734
  def Exec(self, feedback_fn):
1735
    """Dump a representation of the cluster config to the standard output.
1736

1737
    """
1738
    return self.cfg.DumpConfig()
1739

    
1740

    
1741
class LURunClusterCommand(NoHooksLU):
1742
  """Run a command on some nodes.
1743

1744
  """
1745
  _OP_REQP = ["command", "nodes"]
1746

    
1747
  def CheckPrereq(self):
1748
    """Check prerequisites.
1749

1750
    It checks that the given list of nodes is valid.
1751

1752
    """
1753
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1754

    
1755
  def Exec(self, feedback_fn):
1756
    """Run a command on some nodes.
1757

1758
    """
1759
    data = []
1760
    for node in self.nodes:
1761
      result = self.ssh.Run(node, "root", self.op.command)
1762
      data.append((node, result.output, result.exit_code))
1763

    
1764
    return data
1765

    
1766

    
1767
class LUActivateInstanceDisks(NoHooksLU):
1768
  """Bring up an instance's disks.
1769

1770
  """
1771
  _OP_REQP = ["instance_name"]
1772

    
1773
  def CheckPrereq(self):
1774
    """Check prerequisites.
1775

1776
    This checks that the instance is in the cluster.
1777

1778
    """
1779
    instance = self.cfg.GetInstanceInfo(
1780
      self.cfg.ExpandInstanceName(self.op.instance_name))
1781
    if instance is None:
1782
      raise errors.OpPrereqError("Instance '%s' not known" %
1783
                                 self.op.instance_name)
1784
    self.instance = instance
1785

    
1786

    
1787
  def Exec(self, feedback_fn):
1788
    """Activate the disks.
1789

1790
    """
1791
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1792
    if not disks_ok:
1793
      raise errors.OpExecError("Cannot activate block devices")
1794

    
1795
    return disks_info
1796

    
1797

    
1798
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1799
  """Prepare the block devices for an instance.
1800

1801
  This sets up the block devices on all nodes.
1802

1803
  Args:
1804
    instance: a ganeti.objects.Instance object
1805
    ignore_secondaries: if true, errors on secondary nodes won't result
1806
                        in an error return from the function
1807

1808
  Returns:
1809
    false if the operation failed
1810
    list of (host, instance_visible_name, node_visible_name) if the operation
1811
         suceeded with the mapping from node devices to instance devices
1812
  """
1813
  device_info = []
1814
  disks_ok = True
1815
  iname = instance.name
1816
  # With the two passes mechanism we try to reduce the window of
1817
  # opportunity for the race condition of switching DRBD to primary
1818
  # before handshaking occured, but we do not eliminate it
1819

    
1820
  # The proper fix would be to wait (with some limits) until the
1821
  # connection has been made and drbd transitions from WFConnection
1822
  # into any other network-connected state (Connected, SyncTarget,
1823
  # SyncSource, etc.)
1824

    
1825
  # 1st pass, assemble on all nodes in secondary mode
1826
  for inst_disk in instance.disks:
1827
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1828
      cfg.SetDiskID(node_disk, node)
1829
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1830
      if not result:
1831
        logger.Error("could not prepare block device %s on node %s"
1832
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1833
        if not ignore_secondaries:
1834
          disks_ok = False
1835

    
1836
  # FIXME: race condition on drbd migration to primary
1837

    
1838
  # 2nd pass, do only the primary node
1839
  for inst_disk in instance.disks:
1840
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1841
      if node != instance.primary_node:
1842
        continue
1843
      cfg.SetDiskID(node_disk, node)
1844
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1845
      if not result:
1846
        logger.Error("could not prepare block device %s on node %s"
1847
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1848
        disks_ok = False
1849
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1850

    
1851
  # leave the disks configured for the primary node
1852
  # this is a workaround that would be fixed better by
1853
  # improving the logical/physical id handling
1854
  for disk in instance.disks:
1855
    cfg.SetDiskID(disk, instance.primary_node)
1856

    
1857
  return disks_ok, device_info
1858

    
1859

    
1860
def _StartInstanceDisks(cfg, instance, force):
1861
  """Start the disks of an instance.
1862

1863
  """
1864
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1865
                                           ignore_secondaries=force)
1866
  if not disks_ok:
1867
    _ShutdownInstanceDisks(instance, cfg)
1868
    if force is not None and not force:
1869
      logger.Error("If the message above refers to a secondary node,"
1870
                   " you can retry the operation using '--force'.")
1871
    raise errors.OpExecError("Disk consistency error")
1872

    
1873

    
1874
class LUDeactivateInstanceDisks(NoHooksLU):
1875
  """Shutdown an instance's disks.
1876

1877
  """
1878
  _OP_REQP = ["instance_name"]
1879

    
1880
  def CheckPrereq(self):
1881
    """Check prerequisites.
1882

1883
    This checks that the instance is in the cluster.
1884

1885
    """
1886
    instance = self.cfg.GetInstanceInfo(
1887
      self.cfg.ExpandInstanceName(self.op.instance_name))
1888
    if instance is None:
1889
      raise errors.OpPrereqError("Instance '%s' not known" %
1890
                                 self.op.instance_name)
1891
    self.instance = instance
1892

    
1893
  def Exec(self, feedback_fn):
1894
    """Deactivate the disks
1895

1896
    """
1897
    instance = self.instance
1898
    ins_l = rpc.call_instance_list([instance.primary_node])
1899
    ins_l = ins_l[instance.primary_node]
1900
    if not type(ins_l) is list:
1901
      raise errors.OpExecError("Can't contact node '%s'" %
1902
                               instance.primary_node)
1903

    
1904
    if self.instance.name in ins_l:
1905
      raise errors.OpExecError("Instance is running, can't shutdown"
1906
                               " block devices.")
1907

    
1908
    _ShutdownInstanceDisks(instance, self.cfg)
1909

    
1910

    
1911
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1912
  """Shutdown block devices of an instance.
1913

1914
  This does the shutdown on all nodes of the instance.
1915

1916
  If the ignore_primary is false, errors on the primary node are
1917
  ignored.
1918

1919
  """
1920
  result = True
1921
  for disk in instance.disks:
1922
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1923
      cfg.SetDiskID(top_disk, node)
1924
      if not rpc.call_blockdev_shutdown(node, top_disk):
1925
        logger.Error("could not shutdown block device %s on node %s" %
1926
                     (disk.iv_name, node))
1927
        if not ignore_primary or node != instance.primary_node:
1928
          result = False
1929
  return result
1930

    
1931

    
1932
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1933
  """Checks if a node has enough free memory.
1934

1935
  This function check if a given node has the needed amount of free
1936
  memory. In case the node has less memory or we cannot get the
1937
  information from the node, this function raise an OpPrereqError
1938
  exception.
1939

1940
  Args:
1941
    - cfg: a ConfigWriter instance
1942
    - node: the node name
1943
    - reason: string to use in the error message
1944
    - requested: the amount of memory in MiB
1945

1946
  """
1947
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1948
  if not nodeinfo or not isinstance(nodeinfo, dict):
1949
    raise errors.OpPrereqError("Could not contact node %s for resource"
1950
                             " information" % (node,))
1951

    
1952
  free_mem = nodeinfo[node].get('memory_free')
1953
  if not isinstance(free_mem, int):
1954
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1955
                             " was '%s'" % (node, free_mem))
1956
  if requested > free_mem:
1957
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1958
                             " needed %s MiB, available %s MiB" %
1959
                             (node, reason, requested, free_mem))
1960

    
1961

    
1962
class LUStartupInstance(LogicalUnit):
1963
  """Starts an instance.
1964

1965
  """
1966
  HPATH = "instance-start"
1967
  HTYPE = constants.HTYPE_INSTANCE
1968
  _OP_REQP = ["instance_name", "force"]
1969

    
1970
  def BuildHooksEnv(self):
1971
    """Build hooks env.
1972

1973
    This runs on master, primary and secondary nodes of the instance.
1974

1975
    """
1976
    env = {
1977
      "FORCE": self.op.force,
1978
      }
1979
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1980
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1981
          list(self.instance.secondary_nodes))
1982
    return env, nl, nl
1983

    
1984
  def CheckPrereq(self):
1985
    """Check prerequisites.
1986

1987
    This checks that the instance is in the cluster.
1988

1989
    """
1990
    instance = self.cfg.GetInstanceInfo(
1991
      self.cfg.ExpandInstanceName(self.op.instance_name))
1992
    if instance is None:
1993
      raise errors.OpPrereqError("Instance '%s' not known" %
1994
                                 self.op.instance_name)
1995

    
1996
    # check bridges existance
1997
    _CheckInstanceBridgesExist(instance)
1998

    
1999
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2000
                         "starting instance %s" % instance.name,
2001
                         instance.memory)
2002

    
2003
    self.instance = instance
2004
    self.op.instance_name = instance.name
2005

    
2006
  def Exec(self, feedback_fn):
2007
    """Start the instance.
2008

2009
    """
2010
    instance = self.instance
2011
    force = self.op.force
2012
    extra_args = getattr(self.op, "extra_args", "")
2013

    
2014
    node_current = instance.primary_node
2015

    
2016
    _StartInstanceDisks(self.cfg, instance, force)
2017

    
2018
    if not rpc.call_instance_start(node_current, instance, extra_args):
2019
      _ShutdownInstanceDisks(instance, self.cfg)
2020
      raise errors.OpExecError("Could not start instance")
2021

    
2022
    self.cfg.MarkInstanceUp(instance.name)
2023

    
2024

    
2025
class LURebootInstance(LogicalUnit):
2026
  """Reboot an instance.
2027

2028
  """
2029
  HPATH = "instance-reboot"
2030
  HTYPE = constants.HTYPE_INSTANCE
2031
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2032

    
2033
  def BuildHooksEnv(self):
2034
    """Build hooks env.
2035

2036
    This runs on master, primary and secondary nodes of the instance.
2037

2038
    """
2039
    env = {
2040
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2041
      }
2042
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2043
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2044
          list(self.instance.secondary_nodes))
2045
    return env, nl, nl
2046

    
2047
  def CheckPrereq(self):
2048
    """Check prerequisites.
2049

2050
    This checks that the instance is in the cluster.
2051

2052
    """
2053
    instance = self.cfg.GetInstanceInfo(
2054
      self.cfg.ExpandInstanceName(self.op.instance_name))
2055
    if instance is None:
2056
      raise errors.OpPrereqError("Instance '%s' not known" %
2057
                                 self.op.instance_name)
2058

    
2059
    # check bridges existance
2060
    _CheckInstanceBridgesExist(instance)
2061

    
2062
    self.instance = instance
2063
    self.op.instance_name = instance.name
2064

    
2065
  def Exec(self, feedback_fn):
2066
    """Reboot the instance.
2067

2068
    """
2069
    instance = self.instance
2070
    ignore_secondaries = self.op.ignore_secondaries
2071
    reboot_type = self.op.reboot_type
2072
    extra_args = getattr(self.op, "extra_args", "")
2073

    
2074
    node_current = instance.primary_node
2075

    
2076
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2077
                           constants.INSTANCE_REBOOT_HARD,
2078
                           constants.INSTANCE_REBOOT_FULL]:
2079
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2080
                                  (constants.INSTANCE_REBOOT_SOFT,
2081
                                   constants.INSTANCE_REBOOT_HARD,
2082
                                   constants.INSTANCE_REBOOT_FULL))
2083

    
2084
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2085
                       constants.INSTANCE_REBOOT_HARD]:
2086
      if not rpc.call_instance_reboot(node_current, instance,
2087
                                      reboot_type, extra_args):
2088
        raise errors.OpExecError("Could not reboot instance")
2089
    else:
2090
      if not rpc.call_instance_shutdown(node_current, instance):
2091
        raise errors.OpExecError("could not shutdown instance for full reboot")
2092
      _ShutdownInstanceDisks(instance, self.cfg)
2093
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2094
      if not rpc.call_instance_start(node_current, instance, extra_args):
2095
        _ShutdownInstanceDisks(instance, self.cfg)
2096
        raise errors.OpExecError("Could not start instance for full reboot")
2097

    
2098
    self.cfg.MarkInstanceUp(instance.name)
2099

    
2100

    
2101
class LUShutdownInstance(LogicalUnit):
2102
  """Shutdown an instance.
2103

2104
  """
2105
  HPATH = "instance-stop"
2106
  HTYPE = constants.HTYPE_INSTANCE
2107
  _OP_REQP = ["instance_name"]
2108

    
2109
  def BuildHooksEnv(self):
2110
    """Build hooks env.
2111

2112
    This runs on master, primary and secondary nodes of the instance.
2113

2114
    """
2115
    env = _BuildInstanceHookEnvByObject(self.instance)
2116
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2117
          list(self.instance.secondary_nodes))
2118
    return env, nl, nl
2119

    
2120
  def CheckPrereq(self):
2121
    """Check prerequisites.
2122

2123
    This checks that the instance is in the cluster.
2124

2125
    """
2126
    instance = self.cfg.GetInstanceInfo(
2127
      self.cfg.ExpandInstanceName(self.op.instance_name))
2128
    if instance is None:
2129
      raise errors.OpPrereqError("Instance '%s' not known" %
2130
                                 self.op.instance_name)
2131
    self.instance = instance
2132

    
2133
  def Exec(self, feedback_fn):
2134
    """Shutdown the instance.
2135

2136
    """
2137
    instance = self.instance
2138
    node_current = instance.primary_node
2139
    if not rpc.call_instance_shutdown(node_current, instance):
2140
      logger.Error("could not shutdown instance")
2141

    
2142
    self.cfg.MarkInstanceDown(instance.name)
2143
    _ShutdownInstanceDisks(instance, self.cfg)
2144

    
2145

    
2146
class LUReinstallInstance(LogicalUnit):
2147
  """Reinstall an instance.
2148

2149
  """
2150
  HPATH = "instance-reinstall"
2151
  HTYPE = constants.HTYPE_INSTANCE
2152
  _OP_REQP = ["instance_name"]
2153

    
2154
  def BuildHooksEnv(self):
2155
    """Build hooks env.
2156

2157
    This runs on master, primary and secondary nodes of the instance.
2158

2159
    """
2160
    env = _BuildInstanceHookEnvByObject(self.instance)
2161
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2162
          list(self.instance.secondary_nodes))
2163
    return env, nl, nl
2164

    
2165
  def CheckPrereq(self):
2166
    """Check prerequisites.
2167

2168
    This checks that the instance is in the cluster and is not running.
2169

2170
    """
2171
    instance = self.cfg.GetInstanceInfo(
2172
      self.cfg.ExpandInstanceName(self.op.instance_name))
2173
    if instance is None:
2174
      raise errors.OpPrereqError("Instance '%s' not known" %
2175
                                 self.op.instance_name)
2176
    if instance.disk_template == constants.DT_DISKLESS:
2177
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2178
                                 self.op.instance_name)
2179
    if instance.status != "down":
2180
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2181
                                 self.op.instance_name)
2182
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2183
    if remote_info:
2184
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2185
                                 (self.op.instance_name,
2186
                                  instance.primary_node))
2187

    
2188
    self.op.os_type = getattr(self.op, "os_type", None)
2189
    if self.op.os_type is not None:
2190
      # OS verification
2191
      pnode = self.cfg.GetNodeInfo(
2192
        self.cfg.ExpandNodeName(instance.primary_node))
2193
      if pnode is None:
2194
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2195
                                   self.op.pnode)
2196
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2197
      if not os_obj:
2198
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2199
                                   " primary node"  % self.op.os_type)
2200

    
2201
    self.instance = instance
2202

    
2203
  def Exec(self, feedback_fn):
2204
    """Reinstall the instance.
2205

2206
    """
2207
    inst = self.instance
2208

    
2209
    if self.op.os_type is not None:
2210
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2211
      inst.os = self.op.os_type
2212
      self.cfg.AddInstance(inst)
2213

    
2214
    _StartInstanceDisks(self.cfg, inst, None)
2215
    try:
2216
      feedback_fn("Running the instance OS create scripts...")
2217
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2218
        raise errors.OpExecError("Could not install OS for instance %s"
2219
                                 " on node %s" %
2220
                                 (inst.name, inst.primary_node))
2221
    finally:
2222
      _ShutdownInstanceDisks(inst, self.cfg)
2223

    
2224

    
2225
class LURenameInstance(LogicalUnit):
2226
  """Rename an instance.
2227

2228
  """
2229
  HPATH = "instance-rename"
2230
  HTYPE = constants.HTYPE_INSTANCE
2231
  _OP_REQP = ["instance_name", "new_name"]
2232

    
2233
  def BuildHooksEnv(self):
2234
    """Build hooks env.
2235

2236
    This runs on master, primary and secondary nodes of the instance.
2237

2238
    """
2239
    env = _BuildInstanceHookEnvByObject(self.instance)
2240
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2241
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2242
          list(self.instance.secondary_nodes))
2243
    return env, nl, nl
2244

    
2245
  def CheckPrereq(self):
2246
    """Check prerequisites.
2247

2248
    This checks that the instance is in the cluster and is not running.
2249

2250
    """
2251
    instance = self.cfg.GetInstanceInfo(
2252
      self.cfg.ExpandInstanceName(self.op.instance_name))
2253
    if instance is None:
2254
      raise errors.OpPrereqError("Instance '%s' not known" %
2255
                                 self.op.instance_name)
2256
    if instance.status != "down":
2257
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2258
                                 self.op.instance_name)
2259
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2260
    if remote_info:
2261
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2262
                                 (self.op.instance_name,
2263
                                  instance.primary_node))
2264
    self.instance = instance
2265

    
2266
    # new name verification
2267
    name_info = utils.HostInfo(self.op.new_name)
2268

    
2269
    self.op.new_name = new_name = name_info.name
2270
    instance_list = self.cfg.GetInstanceList()
2271
    if new_name in instance_list:
2272
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2273
                                 instance_name)
2274

    
2275
    if not getattr(self.op, "ignore_ip", False):
2276
      command = ["fping", "-q", name_info.ip]
2277
      result = utils.RunCmd(command)
2278
      if not result.failed:
2279
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2280
                                   (name_info.ip, new_name))
2281

    
2282

    
2283
  def Exec(self, feedback_fn):
2284
    """Reinstall the instance.
2285

2286
    """
2287
    inst = self.instance
2288
    old_name = inst.name
2289

    
2290
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2291

    
2292
    # re-read the instance from the configuration after rename
2293
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2294

    
2295
    _StartInstanceDisks(self.cfg, inst, None)
2296
    try:
2297
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2298
                                          "sda", "sdb"):
2299
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2300
               " instance has been renamed in Ganeti)" %
2301
               (inst.name, inst.primary_node))
2302
        logger.Error(msg)
2303
    finally:
2304
      _ShutdownInstanceDisks(inst, self.cfg)
2305

    
2306

    
2307
class LURemoveInstance(LogicalUnit):
2308
  """Remove an instance.
2309

2310
  """
2311
  HPATH = "instance-remove"
2312
  HTYPE = constants.HTYPE_INSTANCE
2313
  _OP_REQP = ["instance_name"]
2314

    
2315
  def BuildHooksEnv(self):
2316
    """Build hooks env.
2317

2318
    This runs on master, primary and secondary nodes of the instance.
2319

2320
    """
2321
    env = _BuildInstanceHookEnvByObject(self.instance)
2322
    nl = [self.sstore.GetMasterNode()]
2323
    return env, nl, nl
2324

    
2325
  def CheckPrereq(self):
2326
    """Check prerequisites.
2327

2328
    This checks that the instance is in the cluster.
2329

2330
    """
2331
    instance = self.cfg.GetInstanceInfo(
2332
      self.cfg.ExpandInstanceName(self.op.instance_name))
2333
    if instance is None:
2334
      raise errors.OpPrereqError("Instance '%s' not known" %
2335
                                 self.op.instance_name)
2336
    self.instance = instance
2337

    
2338
  def Exec(self, feedback_fn):
2339
    """Remove the instance.
2340

2341
    """
2342
    instance = self.instance
2343
    logger.Info("shutting down instance %s on node %s" %
2344
                (instance.name, instance.primary_node))
2345

    
2346
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2347
      if self.op.ignore_failures:
2348
        feedback_fn("Warning: can't shutdown instance")
2349
      else:
2350
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2351
                                 (instance.name, instance.primary_node))
2352

    
2353
    logger.Info("removing block devices for instance %s" % instance.name)
2354

    
2355
    if not _RemoveDisks(instance, self.cfg):
2356
      if self.op.ignore_failures:
2357
        feedback_fn("Warning: can't remove instance's disks")
2358
      else:
2359
        raise errors.OpExecError("Can't remove instance's disks")
2360

    
2361
    logger.Info("removing instance %s out of cluster config" % instance.name)
2362

    
2363
    self.cfg.RemoveInstance(instance.name)
2364

    
2365

    
2366
class LUQueryInstances(NoHooksLU):
2367
  """Logical unit for querying instances.
2368

2369
  """
2370
  _OP_REQP = ["output_fields", "names"]
2371

    
2372
  def CheckPrereq(self):
2373
    """Check prerequisites.
2374

2375
    This checks that the fields required are valid output fields.
2376

2377
    """
2378
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2379
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2380
                               "admin_state", "admin_ram",
2381
                               "disk_template", "ip", "mac", "bridge",
2382
                               "sda_size", "sdb_size", "vcpus"],
2383
                       dynamic=self.dynamic_fields,
2384
                       selected=self.op.output_fields)
2385

    
2386
    self.wanted = _GetWantedInstances(self, self.op.names)
2387

    
2388
  def Exec(self, feedback_fn):
2389
    """Computes the list of nodes and their attributes.
2390

2391
    """
2392
    instance_names = self.wanted
2393
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2394
                     in instance_names]
2395

    
2396
    # begin data gathering
2397

    
2398
    nodes = frozenset([inst.primary_node for inst in instance_list])
2399

    
2400
    bad_nodes = []
2401
    if self.dynamic_fields.intersection(self.op.output_fields):
2402
      live_data = {}
2403
      node_data = rpc.call_all_instances_info(nodes)
2404
      for name in nodes:
2405
        result = node_data[name]
2406
        if result:
2407
          live_data.update(result)
2408
        elif result == False:
2409
          bad_nodes.append(name)
2410
        # else no instance is alive
2411
    else:
2412
      live_data = dict([(name, {}) for name in instance_names])
2413

    
2414
    # end data gathering
2415

    
2416
    output = []
2417
    for instance in instance_list:
2418
      iout = []
2419
      for field in self.op.output_fields:
2420
        if field == "name":
2421
          val = instance.name
2422
        elif field == "os":
2423
          val = instance.os
2424
        elif field == "pnode":
2425
          val = instance.primary_node
2426
        elif field == "snodes":
2427
          val = list(instance.secondary_nodes)
2428
        elif field == "admin_state":
2429
          val = (instance.status != "down")
2430
        elif field == "oper_state":
2431
          if instance.primary_node in bad_nodes:
2432
            val = None
2433
          else:
2434
            val = bool(live_data.get(instance.name))
2435
        elif field == "status":
2436
          if instance.primary_node in bad_nodes:
2437
            val = "ERROR_nodedown"
2438
          else:
2439
            running = bool(live_data.get(instance.name))
2440
            if running:
2441
              if instance.status != "down":
2442
                val = "running"
2443
              else:
2444
                val = "ERROR_up"
2445
            else:
2446
              if instance.status != "down":
2447
                val = "ERROR_down"
2448
              else:
2449
                val = "ADMIN_down"
2450
        elif field == "admin_ram":
2451
          val = instance.memory
2452
        elif field == "oper_ram":
2453
          if instance.primary_node in bad_nodes:
2454
            val = None
2455
          elif instance.name in live_data:
2456
            val = live_data[instance.name].get("memory", "?")
2457
          else:
2458
            val = "-"
2459
        elif field == "disk_template":
2460
          val = instance.disk_template
2461
        elif field == "ip":
2462
          val = instance.nics[0].ip
2463
        elif field == "bridge":
2464
          val = instance.nics[0].bridge
2465
        elif field == "mac":
2466
          val = instance.nics[0].mac
2467
        elif field == "sda_size" or field == "sdb_size":
2468
          disk = instance.FindDisk(field[:3])
2469
          if disk is None:
2470
            val = None
2471
          else:
2472
            val = disk.size
2473
        elif field == "vcpus":
2474
          val = instance.vcpus
2475
        else:
2476
          raise errors.ParameterError(field)
2477
        iout.append(val)
2478
      output.append(iout)
2479

    
2480
    return output
2481

    
2482

    
2483
class LUFailoverInstance(LogicalUnit):
2484
  """Failover an instance.
2485

2486
  """
2487
  HPATH = "instance-failover"
2488
  HTYPE = constants.HTYPE_INSTANCE
2489
  _OP_REQP = ["instance_name", "ignore_consistency"]
2490

    
2491
  def BuildHooksEnv(self):
2492
    """Build hooks env.
2493

2494
    This runs on master, primary and secondary nodes of the instance.
2495

2496
    """
2497
    env = {
2498
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2499
      }
2500
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2501
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2502
    return env, nl, nl
2503

    
2504
  def CheckPrereq(self):
2505
    """Check prerequisites.
2506

2507
    This checks that the instance is in the cluster.
2508

2509
    """
2510
    instance = self.cfg.GetInstanceInfo(
2511
      self.cfg.ExpandInstanceName(self.op.instance_name))
2512
    if instance is None:
2513
      raise errors.OpPrereqError("Instance '%s' not known" %
2514
                                 self.op.instance_name)
2515

    
2516
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2517
      raise errors.OpPrereqError("Instance's disk layout is not"
2518
                                 " network mirrored, cannot failover.")
2519

    
2520
    secondary_nodes = instance.secondary_nodes
2521
    if not secondary_nodes:
2522
      raise errors.ProgrammerError("no secondary node but using "
2523
                                   "DT_REMOTE_RAID1 template")
2524

    
2525
    target_node = secondary_nodes[0]
2526
    # check memory requirements on the secondary node
2527
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2528
                         instance.name, instance.memory)
2529

    
2530
    # check bridge existance
2531
    brlist = [nic.bridge for nic in instance.nics]
2532
    if not rpc.call_bridges_exist(target_node, brlist):
2533
      raise errors.OpPrereqError("One or more target bridges %s does not"
2534
                                 " exist on destination node '%s'" %
2535
                                 (brlist, target_node))
2536

    
2537
    self.instance = instance
2538

    
2539
  def Exec(self, feedback_fn):
2540
    """Failover an instance.
2541

2542
    The failover is done by shutting it down on its present node and
2543
    starting it on the secondary.
2544

2545
    """
2546
    instance = self.instance
2547

    
2548
    source_node = instance.primary_node
2549
    target_node = instance.secondary_nodes[0]
2550

    
2551
    feedback_fn("* checking disk consistency between source and target")
2552
    for dev in instance.disks:
2553
      # for remote_raid1, these are md over drbd
2554
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2555
        if not self.op.ignore_consistency:
2556
          raise errors.OpExecError("Disk %s is degraded on target node,"
2557
                                   " aborting failover." % dev.iv_name)
2558

    
2559
    feedback_fn("* shutting down instance on source node")
2560
    logger.Info("Shutting down instance %s on node %s" %
2561
                (instance.name, source_node))
2562

    
2563
    if not rpc.call_instance_shutdown(source_node, instance):
2564
      if self.op.ignore_consistency:
2565
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2566
                     " anyway. Please make sure node %s is down"  %
2567
                     (instance.name, source_node, source_node))
2568
      else:
2569
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2570
                                 (instance.name, source_node))
2571

    
2572
    feedback_fn("* deactivating the instance's disks on source node")
2573
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2574
      raise errors.OpExecError("Can't shut down the instance's disks.")
2575

    
2576
    instance.primary_node = target_node
2577
    # distribute new instance config to the other nodes
2578
    self.cfg.AddInstance(instance)
2579

    
2580
    feedback_fn("* activating the instance's disks on target node")
2581
    logger.Info("Starting instance %s on node %s" %
2582
                (instance.name, target_node))
2583

    
2584
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2585
                                             ignore_secondaries=True)
2586
    if not disks_ok:
2587
      _ShutdownInstanceDisks(instance, self.cfg)
2588
      raise errors.OpExecError("Can't activate the instance's disks")
2589

    
2590
    feedback_fn("* starting the instance on the target node")
2591
    if not rpc.call_instance_start(target_node, instance, None):
2592
      _ShutdownInstanceDisks(instance, self.cfg)
2593
      raise errors.OpExecError("Could not start instance %s on node %s." %
2594
                               (instance.name, target_node))
2595

    
2596

    
2597
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2598
  """Create a tree of block devices on the primary node.
2599

2600
  This always creates all devices.
2601

2602
  """
2603
  if device.children:
2604
    for child in device.children:
2605
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2606
        return False
2607

    
2608
  cfg.SetDiskID(device, node)
2609
  new_id = rpc.call_blockdev_create(node, device, device.size,
2610
                                    instance.name, True, info)
2611
  if not new_id:
2612
    return False
2613
  if device.physical_id is None:
2614
    device.physical_id = new_id
2615
  return True
2616

    
2617

    
2618
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2619
  """Create a tree of block devices on a secondary node.
2620

2621
  If this device type has to be created on secondaries, create it and
2622
  all its children.
2623

2624
  If not, just recurse to children keeping the same 'force' value.
2625

2626
  """
2627
  if device.CreateOnSecondary():
2628
    force = True
2629
  if device.children:
2630
    for child in device.children:
2631
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2632
                                        child, force, info):
2633
        return False
2634

    
2635
  if not force:
2636
    return True
2637
  cfg.SetDiskID(device, node)
2638
  new_id = rpc.call_blockdev_create(node, device, device.size,
2639
                                    instance.name, False, info)
2640
  if not new_id:
2641
    return False
2642
  if device.physical_id is None:
2643
    device.physical_id = new_id
2644
  return True
2645

    
2646

    
2647
def _GenerateUniqueNames(cfg, exts):
2648
  """Generate a suitable LV name.
2649

2650
  This will generate a logical volume name for the given instance.
2651

2652
  """
2653
  results = []
2654
  for val in exts:
2655
    new_id = cfg.GenerateUniqueID()
2656
    results.append("%s%s" % (new_id, val))
2657
  return results
2658

    
2659

    
2660
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2661
  """Generate a drbd device complete with its children.
2662

2663
  """
2664
  port = cfg.AllocatePort()
2665
  vgname = cfg.GetVGName()
2666
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2667
                          logical_id=(vgname, names[0]))
2668
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2669
                          logical_id=(vgname, names[1]))
2670
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2671
                          logical_id = (primary, secondary, port),
2672
                          children = [dev_data, dev_meta])
2673
  return drbd_dev
2674

    
2675

    
2676
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2677
  """Generate a drbd8 device complete with its children.
2678

2679
  """
2680
  port = cfg.AllocatePort()
2681
  vgname = cfg.GetVGName()
2682
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2683
                          logical_id=(vgname, names[0]))
2684
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2685
                          logical_id=(vgname, names[1]))
2686
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2687
                          logical_id = (primary, secondary, port),
2688
                          children = [dev_data, dev_meta],
2689
                          iv_name=iv_name)
2690
  return drbd_dev
2691

    
2692

    
2693
def _GenerateDiskTemplate(cfg, template_name,
2694
                          instance_name, primary_node,
2695
                          secondary_nodes, disk_sz, swap_sz):
2696
  """Generate the entire disk layout for a given template type.
2697

2698
  """
2699
  #TODO: compute space requirements
2700

    
2701
  vgname = cfg.GetVGName()
2702
  if template_name == constants.DT_DISKLESS:
2703
    disks = []
2704
  elif template_name == constants.DT_PLAIN:
2705
    if len(secondary_nodes) != 0:
2706
      raise errors.ProgrammerError("Wrong template configuration")
2707

    
2708
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2709
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2710
                           logical_id=(vgname, names[0]),
2711
                           iv_name = "sda")
2712
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2713
                           logical_id=(vgname, names[1]),
2714
                           iv_name = "sdb")
2715
    disks = [sda_dev, sdb_dev]
2716
  elif template_name == constants.DT_LOCAL_RAID1:
2717
    if len(secondary_nodes) != 0:
2718
      raise errors.ProgrammerError("Wrong template configuration")
2719

    
2720

    
2721
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2722
                                       ".sdb_m1", ".sdb_m2"])
2723
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2724
                              logical_id=(vgname, names[0]))
2725
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2726
                              logical_id=(vgname, names[1]))
2727
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2728
                              size=disk_sz,
2729
                              children = [sda_dev_m1, sda_dev_m2])
2730
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2731
                              logical_id=(vgname, names[2]))
2732
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2733
                              logical_id=(vgname, names[3]))
2734
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2735
                              size=swap_sz,
2736
                              children = [sdb_dev_m1, sdb_dev_m2])
2737
    disks = [md_sda_dev, md_sdb_dev]
2738
  elif template_name == constants.DT_REMOTE_RAID1:
2739
    if len(secondary_nodes) != 1:
2740
      raise errors.ProgrammerError("Wrong template configuration")
2741
    remote_node = secondary_nodes[0]
2742
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2743
                                       ".sdb_data", ".sdb_meta"])
2744
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2745
                                         disk_sz, names[0:2])
2746
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2747
                              children = [drbd_sda_dev], size=disk_sz)
2748
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2749
                                         swap_sz, names[2:4])
2750
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2751
                              children = [drbd_sdb_dev], size=swap_sz)
2752
    disks = [md_sda_dev, md_sdb_dev]
2753
  elif template_name == constants.DT_DRBD8:
2754
    if len(secondary_nodes) != 1:
2755
      raise errors.ProgrammerError("Wrong template configuration")
2756
    remote_node = secondary_nodes[0]
2757
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2758
                                       ".sdb_data", ".sdb_meta"])
2759
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2760
                                         disk_sz, names[0:2], "sda")
2761
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2762
                                         swap_sz, names[2:4], "sdb")
2763
    disks = [drbd_sda_dev, drbd_sdb_dev]
2764
  else:
2765
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2766
  return disks
2767

    
2768

    
2769
def _GetInstanceInfoText(instance):
2770
  """Compute that text that should be added to the disk's metadata.
2771

2772
  """
2773
  return "originstname+%s" % instance.name
2774

    
2775

    
2776
def _CreateDisks(cfg, instance):
2777
  """Create all disks for an instance.
2778

2779
  This abstracts away some work from AddInstance.
2780

2781
  Args:
2782
    instance: the instance object
2783

2784
  Returns:
2785
    True or False showing the success of the creation process
2786

2787
  """
2788
  info = _GetInstanceInfoText(instance)
2789

    
2790
  for device in instance.disks:
2791
    logger.Info("creating volume %s for instance %s" %
2792
              (device.iv_name, instance.name))
2793
    #HARDCODE
2794
    for secondary_node in instance.secondary_nodes:
2795
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2796
                                        device, False, info):
2797
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2798
                     (device.iv_name, device, secondary_node))
2799
        return False
2800
    #HARDCODE
2801
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2802
                                    instance, device, info):
2803
      logger.Error("failed to create volume %s on primary!" %
2804
                   device.iv_name)
2805
      return False
2806
  return True
2807

    
2808

    
2809
def _RemoveDisks(instance, cfg):
2810
  """Remove all disks for an instance.
2811

2812
  This abstracts away some work from `AddInstance()` and
2813
  `RemoveInstance()`. Note that in case some of the devices couldn't
2814
  be removed, the removal will continue with the other ones (compare
2815
  with `_CreateDisks()`).
2816

2817
  Args:
2818
    instance: the instance object
2819

2820
  Returns:
2821
    True or False showing the success of the removal proces
2822

2823
  """
2824
  logger.Info("removing block devices for instance %s" % instance.name)
2825

    
2826
  result = True
2827
  for device in instance.disks:
2828
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2829
      cfg.SetDiskID(disk, node)
2830
      if not rpc.call_blockdev_remove(node, disk):
2831
        logger.Error("could not remove block device %s on node %s,"
2832
                     " continuing anyway" %
2833
                     (device.iv_name, node))
2834
        result = False
2835
  return result
2836

    
2837

    
2838
class LUCreateInstance(LogicalUnit):
2839
  """Create an instance.
2840

2841
  """
2842
  HPATH = "instance-add"
2843
  HTYPE = constants.HTYPE_INSTANCE
2844
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2845
              "disk_template", "swap_size", "mode", "start", "vcpus",
2846
              "wait_for_sync", "ip_check", "mac"]
2847

    
2848
  def BuildHooksEnv(self):
2849
    """Build hooks env.
2850

2851
    This runs on master, primary and secondary nodes of the instance.
2852

2853
    """
2854
    env = {
2855
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2856
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2857
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2858
      "INSTANCE_ADD_MODE": self.op.mode,
2859
      }
2860
    if self.op.mode == constants.INSTANCE_IMPORT:
2861
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2862
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2863
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2864

    
2865
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2866
      primary_node=self.op.pnode,
2867
      secondary_nodes=self.secondaries,
2868
      status=self.instance_status,
2869
      os_type=self.op.os_type,
2870
      memory=self.op.mem_size,
2871
      vcpus=self.op.vcpus,
2872
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2873
    ))
2874

    
2875
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2876
          self.secondaries)
2877
    return env, nl, nl
2878

    
2879

    
2880
  def CheckPrereq(self):
2881
    """Check prerequisites.
2882

2883
    """
2884
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2885
      if not hasattr(self.op, attr):
2886
        setattr(self.op, attr, None)
2887

    
2888
    if self.op.mode not in (constants.INSTANCE_CREATE,
2889
                            constants.INSTANCE_IMPORT):
2890
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2891
                                 self.op.mode)
2892

    
2893
    if self.op.mode == constants.INSTANCE_IMPORT:
2894
      src_node = getattr(self.op, "src_node", None)
2895
      src_path = getattr(self.op, "src_path", None)
2896
      if src_node is None or src_path is None:
2897
        raise errors.OpPrereqError("Importing an instance requires source"
2898
                                   " node and path options")
2899
      src_node_full = self.cfg.ExpandNodeName(src_node)
2900
      if src_node_full is None:
2901
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2902
      self.op.src_node = src_node = src_node_full
2903

    
2904
      if not os.path.isabs(src_path):
2905
        raise errors.OpPrereqError("The source path must be absolute")
2906

    
2907
      export_info = rpc.call_export_info(src_node, src_path)
2908

    
2909
      if not export_info:
2910
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2911

    
2912
      if not export_info.has_section(constants.INISECT_EXP):
2913
        raise errors.ProgrammerError("Corrupted export config")
2914

    
2915
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2916
      if (int(ei_version) != constants.EXPORT_VERSION):
2917
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2918
                                   (ei_version, constants.EXPORT_VERSION))
2919

    
2920
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2921
        raise errors.OpPrereqError("Can't import instance with more than"
2922
                                   " one data disk")
2923

    
2924
      # FIXME: are the old os-es, disk sizes, etc. useful?
2925
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2926
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2927
                                                         'disk0_dump'))
2928
      self.src_image = diskimage
2929
    else: # INSTANCE_CREATE
2930
      if getattr(self.op, "os_type", None) is None:
2931
        raise errors.OpPrereqError("No guest OS specified")
2932

    
2933
    # check primary node
2934
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2935
    if pnode is None:
2936
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2937
                                 self.op.pnode)
2938
    self.op.pnode = pnode.name
2939
    self.pnode = pnode
2940
    self.secondaries = []
2941
    # disk template and mirror node verification
2942
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2943
      raise errors.OpPrereqError("Invalid disk template name")
2944

    
2945
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2946
      if getattr(self.op, "snode", None) is None:
2947
        raise errors.OpPrereqError("The networked disk templates need"
2948
                                   " a mirror node")
2949

    
2950
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2951
      if snode_name is None:
2952
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2953
                                   self.op.snode)
2954
      elif snode_name == pnode.name:
2955
        raise errors.OpPrereqError("The secondary node cannot be"
2956
                                   " the primary node.")
2957
      self.secondaries.append(snode_name)
2958

    
2959
    # Required free disk space as a function of disk and swap space
2960
    req_size_dict = {
2961
      constants.DT_DISKLESS: None,
2962
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2963
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2964
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2965
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2966
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2967
    }
2968

    
2969
    if self.op.disk_template not in req_size_dict:
2970
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2971
                                   " is unknown" %  self.op.disk_template)
2972

    
2973
    req_size = req_size_dict[self.op.disk_template]
2974

    
2975
    # Check lv size requirements
2976
    if req_size is not None:
2977
      nodenames = [pnode.name] + self.secondaries
2978
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2979
      for node in nodenames:
2980
        info = nodeinfo.get(node, None)
2981
        if not info:
2982
          raise errors.OpPrereqError("Cannot get current information"
2983
                                     " from node '%s'" % nodeinfo)
2984
        vg_free = info.get('vg_free', None)
2985
        if not isinstance(vg_free, int):
2986
          raise errors.OpPrereqError("Can't compute free disk space on"
2987
                                     " node %s" % node)
2988
        if req_size > info['vg_free']:
2989
          raise errors.OpPrereqError("Not enough disk space on target node %s."
2990
                                     " %d MB available, %d MB required" %
2991
                                     (node, info['vg_free'], req_size))
2992

    
2993
    # os verification
2994
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2995
    if not os_obj:
2996
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2997
                                 " primary node"  % self.op.os_type)
2998

    
2999
    if self.op.kernel_path == constants.VALUE_NONE:
3000
      raise errors.OpPrereqError("Can't set instance kernel to none")
3001

    
3002
    # instance verification
3003
    hostname1 = utils.HostInfo(self.op.instance_name)
3004

    
3005
    self.op.instance_name = instance_name = hostname1.name
3006
    instance_list = self.cfg.GetInstanceList()
3007
    if instance_name in instance_list:
3008
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3009
                                 instance_name)
3010

    
3011
    ip = getattr(self.op, "ip", None)
3012
    if ip is None or ip.lower() == "none":
3013
      inst_ip = None
3014
    elif ip.lower() == "auto":
3015
      inst_ip = hostname1.ip
3016
    else:
3017
      if not utils.IsValidIP(ip):
3018
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3019
                                   " like a valid IP" % ip)
3020
      inst_ip = ip
3021
    self.inst_ip = inst_ip
3022

    
3023
    if self.op.start and not self.op.ip_check:
3024
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3025
                                 " adding an instance in start mode")
3026

    
3027
    if self.op.ip_check:
3028
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3029
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3030
                                   (hostname1.ip, instance_name))
3031

    
3032
    # MAC address verification
3033
    if self.op.mac != "auto":
3034
      if not utils.IsValidMac(self.op.mac.lower()):
3035
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3036
                                   self.op.mac)
3037

    
3038
    # bridge verification
3039
    bridge = getattr(self.op, "bridge", None)
3040
    if bridge is None:
3041
      self.op.bridge = self.cfg.GetDefBridge()
3042
    else:
3043
      self.op.bridge = bridge
3044

    
3045
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3046
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3047
                                 " destination node '%s'" %
3048
                                 (self.op.bridge, pnode.name))
3049

    
3050
    # boot order verification
3051
    if self.op.hvm_boot_order is not None:
3052
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3053
        raise errors.OpPrereqError("invalid boot order specified,"
3054
                                   " must be one or more of [acdn]")
3055

    
3056
    if self.op.start:
3057
      self.instance_status = 'up'
3058
    else:
3059
      self.instance_status = 'down'
3060

    
3061
  def Exec(self, feedback_fn):
3062
    """Create and add the instance to the cluster.
3063

3064
    """
3065
    instance = self.op.instance_name
3066
    pnode_name = self.pnode.name
3067

    
3068
    if self.op.mac == "auto":
3069
      mac_address = self.cfg.GenerateMAC()
3070
    else:
3071
      mac_address = self.op.mac
3072

    
3073
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3074
    if self.inst_ip is not None:
3075
      nic.ip = self.inst_ip
3076

    
3077
    ht_kind = self.sstore.GetHypervisorType()
3078
    if ht_kind in constants.HTS_REQ_PORT:
3079
      network_port = self.cfg.AllocatePort()
3080
    else:
3081
      network_port = None
3082

    
3083
    disks = _GenerateDiskTemplate(self.cfg,
3084
                                  self.op.disk_template,
3085
                                  instance, pnode_name,
3086
                                  self.secondaries, self.op.disk_size,
3087
                                  self.op.swap_size)
3088

    
3089
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3090
                            primary_node=pnode_name,
3091
                            memory=self.op.mem_size,
3092
                            vcpus=self.op.vcpus,
3093
                            nics=[nic], disks=disks,
3094
                            disk_template=self.op.disk_template,
3095
                            status=self.instance_status,
3096
                            network_port=network_port,
3097
                            kernel_path=self.op.kernel_path,
3098
                            initrd_path=self.op.initrd_path,
3099
                            hvm_boot_order=self.op.hvm_boot_order,
3100
                            )
3101

    
3102
    feedback_fn("* creating instance disks...")
3103
    if not _CreateDisks(self.cfg, iobj):
3104
      _RemoveDisks(iobj, self.cfg)
3105
      raise errors.OpExecError("Device creation failed, reverting...")
3106

    
3107
    feedback_fn("adding instance %s to cluster config" % instance)
3108

    
3109
    self.cfg.AddInstance(iobj)
3110

    
3111
    if self.op.wait_for_sync:
3112
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3113
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3114
      # make sure the disks are not degraded (still sync-ing is ok)
3115
      time.sleep(15)
3116
      feedback_fn("* checking mirrors status")
3117
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3118
    else:
3119
      disk_abort = False
3120

    
3121
    if disk_abort:
3122
      _RemoveDisks(iobj, self.cfg)
3123
      self.cfg.RemoveInstance(iobj.name)
3124
      raise errors.OpExecError("There are some degraded disks for"
3125
                               " this instance")
3126

    
3127
    feedback_fn("creating os for instance %s on node %s" %
3128
                (instance, pnode_name))
3129

    
3130
    if iobj.disk_template != constants.DT_DISKLESS:
3131
      if self.op.mode == constants.INSTANCE_CREATE:
3132
        feedback_fn("* running the instance OS create scripts...")
3133
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3134
          raise errors.OpExecError("could not add os for instance %s"
3135
                                   " on node %s" %
3136
                                   (instance, pnode_name))
3137

    
3138
      elif self.op.mode == constants.INSTANCE_IMPORT:
3139
        feedback_fn("* running the instance OS import scripts...")
3140
        src_node = self.op.src_node
3141
        src_image = self.src_image
3142
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3143
                                                src_node, src_image):
3144
          raise errors.OpExecError("Could not import os for instance"
3145
                                   " %s on node %s" %
3146
                                   (instance, pnode_name))
3147
      else:
3148
        # also checked in the prereq part
3149
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3150
                                     % self.op.mode)
3151

    
3152
    if self.op.start:
3153
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3154
      feedback_fn("* starting instance...")
3155
      if not rpc.call_instance_start(pnode_name, iobj, None):
3156
        raise errors.OpExecError("Could not start instance")
3157

    
3158

    
3159
class LUConnectConsole(NoHooksLU):
3160
  """Connect to an instance's console.
3161

3162
  This is somewhat special in that it returns the command line that
3163
  you need to run on the master node in order to connect to the
3164
  console.
3165

3166
  """
3167
  _OP_REQP = ["instance_name"]
3168

    
3169
  def CheckPrereq(self):
3170
    """Check prerequisites.
3171

3172
    This checks that the instance is in the cluster.
3173

3174
    """
3175
    instance = self.cfg.GetInstanceInfo(
3176
      self.cfg.ExpandInstanceName(self.op.instance_name))
3177
    if instance is None:
3178
      raise errors.OpPrereqError("Instance '%s' not known" %
3179
                                 self.op.instance_name)
3180
    self.instance = instance
3181

    
3182
  def Exec(self, feedback_fn):
3183
    """Connect to the console of an instance
3184

3185
    """
3186
    instance = self.instance
3187
    node = instance.primary_node
3188

    
3189
    node_insts = rpc.call_instance_list([node])[node]
3190
    if node_insts is False:
3191
      raise errors.OpExecError("Can't connect to node %s." % node)
3192

    
3193
    if instance.name not in node_insts:
3194
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3195

    
3196
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3197

    
3198
    hyper = hypervisor.GetHypervisor()
3199
    console_cmd = hyper.GetShellCommandForConsole(instance)
3200

    
3201
    # build ssh cmdline
3202
    cmd = self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3203
    return cmd[0], cmd
3204

    
3205

    
3206
class LUReplaceDisks(LogicalUnit):
3207
  """Replace the disks of an instance.
3208

3209
  """
3210
  HPATH = "mirrors-replace"
3211
  HTYPE = constants.HTYPE_INSTANCE
3212
  _OP_REQP = ["instance_name", "mode", "disks"]
3213

    
3214
  def BuildHooksEnv(self):
3215
    """Build hooks env.
3216

3217
    This runs on the master, the primary and all the secondaries.
3218

3219
    """
3220
    env = {
3221
      "MODE": self.op.mode,
3222
      "NEW_SECONDARY": self.op.remote_node,
3223
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3224
      }
3225
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3226
    nl = [
3227
      self.sstore.GetMasterNode(),
3228
      self.instance.primary_node,
3229
      ]
3230
    if self.op.remote_node is not None:
3231
      nl.append(self.op.remote_node)
3232
    return env, nl, nl
3233

    
3234
  def CheckPrereq(self):
3235
    """Check prerequisites.
3236

3237
    This checks that the instance is in the cluster.
3238

3239
    """
3240
    instance = self.cfg.GetInstanceInfo(
3241
      self.cfg.ExpandInstanceName(self.op.instance_name))
3242
    if instance is None:
3243
      raise errors.OpPrereqError("Instance '%s' not known" %
3244
                                 self.op.instance_name)
3245
    self.instance = instance
3246
    self.op.instance_name = instance.name
3247

    
3248
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3249
      raise errors.OpPrereqError("Instance's disk layout is not"
3250
                                 " network mirrored.")
3251

    
3252
    if len(instance.secondary_nodes) != 1:
3253
      raise errors.OpPrereqError("The instance has a strange layout,"
3254
                                 " expected one secondary but found %d" %
3255
                                 len(instance.secondary_nodes))
3256

    
3257
    self.sec_node = instance.secondary_nodes[0]
3258

    
3259
    remote_node = getattr(self.op, "remote_node", None)
3260
    if remote_node is not None:
3261
      remote_node = self.cfg.ExpandNodeName(remote_node)
3262
      if remote_node is None:
3263
        raise errors.OpPrereqError("Node '%s' not known" %
3264
                                   self.op.remote_node)
3265
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3266
    else:
3267
      self.remote_node_info = None
3268
    if remote_node == instance.primary_node:
3269
      raise errors.OpPrereqError("The specified node is the primary node of"
3270
                                 " the instance.")
3271
    elif remote_node == self.sec_node:
3272
      if self.op.mode == constants.REPLACE_DISK_SEC:
3273
        # this is for DRBD8, where we can't execute the same mode of
3274
        # replacement as for drbd7 (no different port allocated)
3275
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3276
                                   " replacement")
3277
      # the user gave the current secondary, switch to
3278
      # 'no-replace-secondary' mode for drbd7
3279
      remote_node = None
3280
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3281
        self.op.mode != constants.REPLACE_DISK_ALL):
3282
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3283
                                 " disks replacement, not individual ones")
3284
    if instance.disk_template == constants.DT_DRBD8:
3285
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3286
          remote_node is not None):
3287
        # switch to replace secondary mode
3288
        self.op.mode = constants.REPLACE_DISK_SEC
3289

    
3290
      if self.op.mode == constants.REPLACE_DISK_ALL:
3291
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3292
                                   " secondary disk replacement, not"
3293
                                   " both at once")
3294
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3295
        if remote_node is not None:
3296
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3297
                                     " the secondary while doing a primary"
3298
                                     " node disk replacement")
3299
        self.tgt_node = instance.primary_node
3300
        self.oth_node = instance.secondary_nodes[0]
3301
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3302
        self.new_node = remote_node # this can be None, in which case
3303
                                    # we don't change the secondary
3304
        self.tgt_node = instance.secondary_nodes[0]
3305
        self.oth_node = instance.primary_node
3306
      else:
3307
        raise errors.ProgrammerError("Unhandled disk replace mode")
3308

    
3309
    for name in self.op.disks:
3310
      if instance.FindDisk(name) is None:
3311
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3312
                                   (name, instance.name))
3313
    self.op.remote_node = remote_node
3314

    
3315
  def _ExecRR1(self, feedback_fn):
3316
    """Replace the disks of an instance.
3317

3318
    """
3319
    instance = self.instance
3320
    iv_names = {}
3321
    # start of work
3322
    if self.op.remote_node is None:
3323
      remote_node = self.sec_node
3324
    else:
3325
      remote_node = self.op.remote_node
3326
    cfg = self.cfg
3327
    for dev in instance.disks:
3328
      size = dev.size
3329
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3330
      names = _GenerateUniqueNames(cfg, lv_names)
3331
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3332
                                       remote_node, size, names)
3333
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3334
      logger.Info("adding new mirror component on secondary for %s" %
3335
                  dev.iv_name)
3336
      #HARDCODE
3337
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3338
                                        new_drbd, False,
3339
                                        _GetInstanceInfoText(instance)):
3340
        raise errors.OpExecError("Failed to create new component on secondary"
3341
                                 " node %s. Full abort, cleanup manually!" %
3342
                                 remote_node)
3343

    
3344
      logger.Info("adding new mirror component on primary")
3345
      #HARDCODE
3346
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3347
                                      instance, new_drbd,
3348
                                      _GetInstanceInfoText(instance)):
3349
        # remove secondary dev
3350
        cfg.SetDiskID(new_drbd, remote_node)
3351
        rpc.call_blockdev_remove(remote_node, new_drbd)
3352
        raise errors.OpExecError("Failed to create volume on primary!"
3353
                                 " Full abort, cleanup manually!!")
3354

    
3355
      # the device exists now
3356
      # call the primary node to add the mirror to md
3357
      logger.Info("adding new mirror component to md")
3358
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3359
                                           [new_drbd]):
3360
        logger.Error("Can't add mirror compoment to md!")
3361
        cfg.SetDiskID(new_drbd, remote_node)
3362
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3363
          logger.Error("Can't rollback on secondary")
3364
        cfg.SetDiskID(new_drbd, instance.primary_node)
3365
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3366
          logger.Error("Can't rollback on primary")
3367
        raise errors.OpExecError("Full abort, cleanup manually!!")
3368

    
3369
      dev.children.append(new_drbd)
3370
      cfg.AddInstance(instance)
3371

    
3372
    # this can fail as the old devices are degraded and _WaitForSync
3373
    # does a combined result over all disks, so we don't check its
3374
    # return value
3375
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3376

    
3377
    # so check manually all the devices
3378
    for name in iv_names:
3379
      dev, child, new_drbd = iv_names[name]
3380
      cfg.SetDiskID(dev, instance.primary_node)
3381
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3382
      if is_degr:
3383
        raise errors.OpExecError("MD device %s is degraded!" % name)
3384
      cfg.SetDiskID(new_drbd, instance.primary_node)
3385
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3386
      if is_degr:
3387
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3388

    
3389
    for name in iv_names:
3390
      dev, child, new_drbd = iv_names[name]
3391
      logger.Info("remove mirror %s component" % name)
3392
      cfg.SetDiskID(dev, instance.primary_node)
3393
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3394
                                              dev, [child]):
3395
        logger.Error("Can't remove child from mirror, aborting"
3396
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3397
        continue
3398

    
3399
      for node in child.logical_id[:2]:
3400
        logger.Info("remove child device on %s" % node)
3401
        cfg.SetDiskID(child, node)
3402
        if not rpc.call_blockdev_remove(node, child):
3403
          logger.Error("Warning: failed to remove device from node %s,"
3404
                       " continuing operation." % node)
3405

    
3406
      dev.children.remove(child)
3407

    
3408
      cfg.AddInstance(instance)
3409

    
3410
  def _ExecD8DiskOnly(self, feedback_fn):
3411
    """Replace a disk on the primary or secondary for dbrd8.
3412

3413
    The algorithm for replace is quite complicated:
3414
      - for each disk to be replaced:
3415
        - create new LVs on the target node with unique names
3416
        - detach old LVs from the drbd device
3417
        - rename old LVs to name_replaced.<time_t>
3418
        - rename new LVs to old LVs
3419
        - attach the new LVs (with the old names now) to the drbd device
3420
      - wait for sync across all devices
3421
      - for each modified disk:
3422
        - remove old LVs (which have the name name_replaces.<time_t>)
3423

3424
    Failures are not very well handled.
3425

3426
    """
3427
    steps_total = 6
3428
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3429
    instance = self.instance
3430
    iv_names = {}
3431
    vgname = self.cfg.GetVGName()
3432
    # start of work
3433
    cfg = self.cfg
3434
    tgt_node = self.tgt_node
3435
    oth_node = self.oth_node
3436

    
3437
    # Step: check device activation
3438
    self.proc.LogStep(1, steps_total, "check device existence")
3439
    info("checking volume groups")
3440
    my_vg = cfg.GetVGName()
3441
    results = rpc.call_vg_list([oth_node, tgt_node])
3442
    if not results:
3443
      raise errors.OpExecError("Can't list volume groups on the nodes")
3444
    for node in oth_node, tgt_node:
3445
      res = results.get(node, False)
3446
      if not res or my_vg not in res:
3447
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3448
                                 (my_vg, node))
3449
    for dev in instance.disks:
3450
      if not dev.iv_name in self.op.disks:
3451
        continue
3452
      for node in tgt_node, oth_node:
3453
        info("checking %s on %s" % (dev.iv_name, node))
3454
        cfg.SetDiskID(dev, node)
3455
        if not rpc.call_blockdev_find(node, dev):
3456
          raise errors.OpExecError("Can't find device %s on node %s" %
3457
                                   (dev.iv_name, node))
3458

    
3459
    # Step: check other node consistency
3460
    self.proc.LogStep(2, steps_total, "check peer consistency")
3461
    for dev in instance.disks:
3462
      if not dev.iv_name in self.op.disks:
3463
        continue
3464
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3465
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3466
                                   oth_node==instance.primary_node):
3467
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3468
                                 " to replace disks on this node (%s)" %
3469
                                 (oth_node, tgt_node))
3470

    
3471
    # Step: create new storage
3472
    self.proc.LogStep(3, steps_total, "allocate new storage")
3473
    for dev in instance.disks:
3474
      if not dev.iv_name in self.op.disks:
3475
        continue
3476
      size = dev.size
3477
      cfg.SetDiskID(dev, tgt_node)
3478
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3479
      names = _GenerateUniqueNames(cfg, lv_names)
3480
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3481
                             logical_id=(vgname, names[0]))
3482
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3483
                             logical_id=(vgname, names[1]))
3484
      new_lvs = [lv_data, lv_meta]
3485
      old_lvs = dev.children
3486
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3487
      info("creating new local storage on %s for %s" %
3488
           (tgt_node, dev.iv_name))
3489
      # since we *always* want to create this LV, we use the
3490
      # _Create...OnPrimary (which forces the creation), even if we
3491
      # are talking about the secondary node
3492
      for new_lv in new_lvs:
3493
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3494
                                        _GetInstanceInfoText(instance)):
3495
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3496
                                   " node '%s'" %
3497
                                   (new_lv.logical_id[1], tgt_node))
3498

    
3499
    # Step: for each lv, detach+rename*2+attach
3500
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3501
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3502
      info("detaching %s drbd from local storage" % dev.iv_name)
3503
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3504
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3505
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3506
      #dev.children = []
3507
      #cfg.Update(instance)
3508

    
3509
      # ok, we created the new LVs, so now we know we have the needed
3510
      # storage; as such, we proceed on the target node to rename
3511
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3512
      # using the assumption that logical_id == physical_id (which in
3513
      # turn is the unique_id on that node)
3514

    
3515
      # FIXME(iustin): use a better name for the replaced LVs
3516
      temp_suffix = int(time.time())
3517
      ren_fn = lambda d, suff: (d.physical_id[0],
3518
                                d.physical_id[1] + "_replaced-%s" % suff)
3519
      # build the rename list based on what LVs exist on the node
3520
      rlist = []
3521
      for to_ren in old_lvs:
3522
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3523
        if find_res is not None: # device exists
3524
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3525

    
3526
      info("renaming the old LVs on the target node")
3527
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3528
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3529
      # now we rename the new LVs to the old LVs
3530
      info("renaming the new LVs on the target node")
3531
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3532
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3533
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3534

    
3535
      for old, new in zip(old_lvs, new_lvs):
3536
        new.logical_id = old.logical_id
3537
        cfg.SetDiskID(new, tgt_node)
3538

    
3539
      for disk in old_lvs:
3540
        disk.logical_id = ren_fn(disk, temp_suffix)
3541
        cfg.SetDiskID(disk, tgt_node)
3542

    
3543
      # now that the new lvs have the old name, we can add them to the device
3544
      info("adding new mirror component on %s" % tgt_node)
3545
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3546
        for new_lv in new_lvs:
3547
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3548
            warning("Can't rollback device %s", hint="manually cleanup unused"
3549
                    " logical volumes")
3550
        raise errors.OpExecError("Can't add local storage to drbd")
3551

    
3552
      dev.children = new_lvs
3553
      cfg.Update(instance)
3554

    
3555
    # Step: wait for sync
3556

    
3557
    # this can fail as the old devices are degraded and _WaitForSync
3558
    # does a combined result over all disks, so we don't check its
3559
    # return value
3560
    self.proc.LogStep(5, steps_total, "sync devices")
3561
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3562

    
3563
    # so check manually all the devices
3564
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3565
      cfg.SetDiskID(dev, instance.primary_node)
3566
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3567
      if is_degr:
3568
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3569

    
3570
    # Step: remove old storage
3571
    self.proc.LogStep(6, steps_total, "removing old storage")
3572
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3573
      info("remove logical volumes for %s" % name)
3574
      for lv in old_lvs:
3575
        cfg.SetDiskID(lv, tgt_node)
3576
        if not rpc.call_blockdev_remove(tgt_node, lv):
3577
          warning("Can't remove old LV", hint="manually remove unused LVs")
3578
          continue
3579

    
3580
  def _ExecD8Secondary(self, feedback_fn):
3581
    """Replace the secondary node for drbd8.
3582

3583
    The algorithm for replace is quite complicated:
3584
      - for all disks of the instance:
3585
        - create new LVs on the new node with same names
3586
        - shutdown the drbd device on the old secondary
3587
        - disconnect the drbd network on the primary
3588
        - create the drbd device on the new secondary
3589
        - network attach the drbd on the primary, using an artifice:
3590
          the drbd code for Attach() will connect to the network if it
3591
          finds a device which is connected to the good local disks but
3592
          not network enabled
3593
      - wait for sync across all devices
3594
      - remove all disks from the old secondary
3595

3596
    Failures are not very well handled.
3597

3598
    """
3599
    steps_total = 6
3600
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3601
    instance = self.instance
3602
    iv_names = {}
3603
    vgname = self.cfg.GetVGName()
3604
    # start of work
3605
    cfg = self.cfg
3606
    old_node = self.tgt_node
3607
    new_node = self.new_node
3608
    pri_node = instance.primary_node
3609

    
3610
    # Step: check device activation
3611
    self.proc.LogStep(1, steps_total, "check device existence")
3612
    info("checking volume groups")
3613
    my_vg = cfg.GetVGName()
3614
    results = rpc.call_vg_list([pri_node, new_node])
3615
    if not results:
3616
      raise errors.OpExecError("Can't list volume groups on the nodes")
3617
    for node in pri_node, new_node:
3618
      res = results.get(node, False)
3619
      if not res or my_vg not in res:
3620
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3621
                                 (my_vg, node))
3622
    for dev in instance.disks:
3623
      if not dev.iv_name in self.op.disks:
3624
        continue
3625
      info("checking %s on %s" % (dev.iv_name, pri_node))
3626
      cfg.SetDiskID(dev, pri_node)
3627
      if not rpc.call_blockdev_find(pri_node, dev):
3628
        raise errors.OpExecError("Can't find device %s on node %s" %
3629
                                 (dev.iv_name, pri_node))
3630

    
3631
    # Step: check other node consistency
3632
    self.proc.LogStep(2, steps_total, "check peer consistency")
3633
    for dev in instance.disks:
3634
      if not dev.iv_name in self.op.disks:
3635
        continue
3636
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3637
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3638
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3639
                                 " unsafe to replace the secondary" %
3640
                                 pri_node)
3641

    
3642
    # Step: create new storage
3643
    self.proc.LogStep(3, steps_total, "allocate new storage")
3644
    for dev in instance.disks:
3645
      size = dev.size
3646
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3647
      # since we *always* want to create this LV, we use the
3648
      # _Create...OnPrimary (which forces the creation), even if we
3649
      # are talking about the secondary node
3650
      for new_lv in dev.children:
3651
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3652
                                        _GetInstanceInfoText(instance)):
3653
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3654
                                   " node '%s'" %
3655
                                   (new_lv.logical_id[1], new_node))
3656

    
3657
      iv_names[dev.iv_name] = (dev, dev.children)
3658

    
3659
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3660
    for dev in instance.disks:
3661
      size = dev.size
3662
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3663
      # create new devices on new_node
3664
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3665
                              logical_id=(pri_node, new_node,
3666
                                          dev.logical_id[2]),
3667
                              children=dev.children)
3668
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3669
                                        new_drbd, False,
3670
                                      _GetInstanceInfoText(instance)):
3671
        raise errors.OpExecError("Failed to create new DRBD on"
3672
                                 " node '%s'" % new_node)
3673

    
3674
    for dev in instance.disks:
3675
      # we have new devices, shutdown the drbd on the old secondary
3676
      info("shutting down drbd for %s on old node" % dev.iv_name)
3677
      cfg.SetDiskID(dev, old_node)
3678
      if not rpc.call_blockdev_shutdown(old_node, dev):
3679
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3680
                hint="Please cleanup this device manually as soon as possible")
3681

    
3682
    info("detaching primary drbds from the network (=> standalone)")
3683
    done = 0
3684
    for dev in instance.disks:
3685
      cfg.SetDiskID(dev, pri_node)
3686
      # set the physical (unique in bdev terms) id to None, meaning
3687
      # detach from network
3688
      dev.physical_id = (None,) * len(dev.physical_id)
3689
      # and 'find' the device, which will 'fix' it to match the
3690
      # standalone state
3691
      if rpc.call_blockdev_find(pri_node, dev):
3692
        done += 1
3693
      else:
3694
        warning("Failed to detach drbd %s from network, unusual case" %
3695
                dev.iv_name)
3696

    
3697
    if not done:
3698
      # no detaches succeeded (very unlikely)
3699
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3700

    
3701
    # if we managed to detach at least one, we update all the disks of
3702
    # the instance to point to the new secondary
3703
    info("updating instance configuration")
3704
    for dev in instance.disks:
3705
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3706
      cfg.SetDiskID(dev, pri_node)
3707
    cfg.Update(instance)
3708

    
3709
    # and now perform the drbd attach
3710
    info("attaching primary drbds to new secondary (standalone => connected)")
3711
    failures = []
3712
    for dev in instance.disks:
3713
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3714
      # since the attach is smart, it's enough to 'find' the device,
3715
      # it will automatically activate the network, if the physical_id
3716
      # is correct
3717
      cfg.SetDiskID(dev, pri_node)
3718
      if not rpc.call_blockdev_find(pri_node, dev):
3719
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3720
                "please do a gnt-instance info to see the status of disks")
3721

    
3722
    # this can fail as the old devices are degraded and _WaitForSync
3723
    # does a combined result over all disks, so we don't check its
3724
    # return value
3725
    self.proc.LogStep(5, steps_total, "sync devices")
3726
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3727

    
3728
    # so check manually all the devices
3729
    for name, (dev, old_lvs) in iv_names.iteritems():
3730
      cfg.SetDiskID(dev, pri_node)
3731
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3732
      if is_degr:
3733
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3734

    
3735
    self.proc.LogStep(6, steps_total, "removing old storage")
3736
    for name, (dev, old_lvs) in iv_names.iteritems():
3737
      info("remove logical volumes for %s" % name)
3738
      for lv in old_lvs:
3739
        cfg.SetDiskID(lv, old_node)
3740
        if not rpc.call_blockdev_remove(old_node, lv):
3741
          warning("Can't remove LV on old secondary",
3742
                  hint="Cleanup stale volumes by hand")
3743

    
3744
  def Exec(self, feedback_fn):
3745
    """Execute disk replacement.
3746

3747
    This dispatches the disk replacement to the appropriate handler.
3748

3749
    """
3750
    instance = self.instance
3751
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3752
      fn = self._ExecRR1
3753
    elif instance.disk_template == constants.DT_DRBD8:
3754
      if self.op.remote_node is None:
3755
        fn = self._ExecD8DiskOnly
3756
      else:
3757
        fn = self._ExecD8Secondary
3758
    else:
3759
      raise errors.ProgrammerError("Unhandled disk replacement case")
3760
    return fn(feedback_fn)
3761

    
3762

    
3763
class LUQueryInstanceData(NoHooksLU):
3764
  """Query runtime instance data.
3765

3766
  """
3767
  _OP_REQP = ["instances"]
3768

    
3769
  def CheckPrereq(self):
3770
    """Check prerequisites.
3771

3772
    This only checks the optional instance list against the existing names.
3773

3774
    """
3775
    if not isinstance(self.op.instances, list):
3776
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3777
    if self.op.instances:
3778
      self.wanted_instances = []
3779
      names = self.op.instances
3780
      for name in names:
3781
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3782
        if instance is None:
3783
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3784
        self.wanted_instances.append(instance)
3785
    else:
3786
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3787
                               in self.cfg.GetInstanceList()]
3788
    return
3789

    
3790

    
3791
  def _ComputeDiskStatus(self, instance, snode, dev):
3792
    """Compute block device status.
3793

3794
    """
3795
    self.cfg.SetDiskID(dev, instance.primary_node)
3796
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3797
    if dev.dev_type in constants.LDS_DRBD:
3798
      # we change the snode then (otherwise we use the one passed in)
3799
      if dev.logical_id[0] == instance.primary_node:
3800
        snode = dev.logical_id[1]
3801
      else:
3802
        snode = dev.logical_id[0]
3803

    
3804
    if snode:
3805
      self.cfg.SetDiskID(dev, snode)
3806
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3807
    else:
3808
      dev_sstatus = None
3809

    
3810
    if dev.children:
3811
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3812
                      for child in dev.children]
3813
    else:
3814
      dev_children = []
3815

    
3816
    data = {
3817
      "iv_name": dev.iv_name,
3818
      "dev_type": dev.dev_type,
3819
      "logical_id": dev.logical_id,
3820
      "physical_id": dev.physical_id,
3821
      "pstatus": dev_pstatus,
3822
      "sstatus": dev_sstatus,
3823
      "children": dev_children,
3824
      }
3825

    
3826
    return data
3827

    
3828
  def Exec(self, feedback_fn):
3829
    """Gather and return data"""
3830
    result = {}
3831
    for instance in self.wanted_instances:
3832
      remote_info = rpc.call_instance_info(instance.primary_node,
3833
                                                instance.name)
3834
      if remote_info and "state" in remote_info:
3835
        remote_state = "up"
3836
      else:
3837
        remote_state = "down"
3838
      if instance.status == "down":
3839
        config_state = "down"
3840
      else:
3841
        config_state = "up"
3842

    
3843
      disks = [self._ComputeDiskStatus(instance, None, device)
3844
               for device in instance.disks]
3845

    
3846
      idict = {
3847
        "name": instance.name,
3848
        "config_state": config_state,
3849
        "run_state": remote_state,
3850
        "pnode": instance.primary_node,
3851
        "snodes": instance.secondary_nodes,
3852
        "os": instance.os,
3853
        "memory": instance.memory,
3854
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3855
        "disks": disks,
3856
        "network_port": instance.network_port,
3857
        "vcpus": instance.vcpus,
3858
        "kernel_path": instance.kernel_path,
3859
        "initrd_path": instance.initrd_path,
3860
        "hvm_boot_order": instance.hvm_boot_order,
3861
        }
3862

    
3863
      result[instance.name] = idict
3864

    
3865
    return result
3866

    
3867

    
3868
class LUSetInstanceParms(LogicalUnit):
3869
  """Modifies an instances's parameters.
3870

3871
  """
3872
  HPATH = "instance-modify"
3873
  HTYPE = constants.HTYPE_INSTANCE
3874
  _OP_REQP = ["instance_name"]
3875

    
3876
  def BuildHooksEnv(self):
3877
    """Build hooks env.
3878

3879
    This runs on the master, primary and secondaries.
3880

3881
    """
3882
    args = dict()
3883
    if self.mem:
3884
      args['memory'] = self.mem
3885
    if self.vcpus:
3886
      args['vcpus'] = self.vcpus
3887
    if self.do_ip or self.do_bridge or self.mac:
3888
      if self.do_ip:
3889
        ip = self.ip
3890
      else:
3891
        ip = self.instance.nics[0].ip
3892
      if self.bridge:
3893
        bridge = self.bridge
3894
      else:
3895
        bridge = self.instance.nics[0].bridge
3896
      if self.mac:
3897
        mac = self.mac
3898
      else:
3899
        mac = self.instance.nics[0].mac
3900
      args['nics'] = [(ip, bridge, mac)]
3901
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3902
    nl = [self.sstore.GetMasterNode(),
3903
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3904
    return env, nl, nl
3905

    
3906
  def CheckPrereq(self):
3907
    """Check prerequisites.
3908

3909
    This only checks the instance list against the existing names.
3910

3911
    """
3912
    self.mem = getattr(self.op, "mem", None)
3913
    self.vcpus = getattr(self.op, "vcpus", None)
3914
    self.ip = getattr(self.op, "ip", None)
3915
    self.mac = getattr(self.op, "mac", None)
3916
    self.bridge = getattr(self.op, "bridge", None)
3917
    self.kernel_path = getattr(self.op, "kernel_path", None)
3918
    self.initrd_path = getattr(self.op, "initrd_path", None)
3919
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
3920
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
3921
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
3922
    if all_parms.count(None) == len(all_parms):
3923
      raise errors.OpPrereqError("No changes submitted")
3924
    if self.mem is not None:
3925
      try:
3926
        self.mem = int(self.mem)
3927
      except ValueError, err:
3928
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3929
    if self.vcpus is not None:
3930
      try:
3931
        self.vcpus = int(self.vcpus)
3932
      except ValueError, err:
3933
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3934
    if self.ip is not None:
3935
      self.do_ip = True
3936
      if self.ip.lower() == "none":
3937
        self.ip = None
3938
      else:
3939
        if not utils.IsValidIP(self.ip):
3940
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3941
    else:
3942
      self.do_ip = False
3943
    self.do_bridge = (self.bridge is not None)
3944
    if self.mac is not None:
3945
      if self.cfg.IsMacInUse(self.mac):
3946
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
3947
                                   self.mac)
3948
      if not utils.IsValidMac(self.mac):
3949
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
3950

    
3951
    if self.kernel_path is not None:
3952
      self.do_kernel_path = True
3953
      if self.kernel_path == constants.VALUE_NONE:
3954
        raise errors.OpPrereqError("Can't set instance to no kernel")
3955

    
3956
      if self.kernel_path != constants.VALUE_DEFAULT:
3957
        if not os.path.isabs(self.kernel_path):
3958
          raise errors.OpPrereqError("The kernel path must be an absolute"
3959
                                    " filename")
3960
    else:
3961
      self.do_kernel_path = False
3962

    
3963
    if self.initrd_path is not None:
3964
      self.do_initrd_path = True
3965
      if self.initrd_path not in (constants.VALUE_NONE,
3966
                                  constants.VALUE_DEFAULT):
3967
        if not os.path.isabs(self.initrd_path):
3968
          raise errors.OpPrereqError("The initrd path must be an absolute"
3969
                                    " filename")
3970
    else:
3971
      self.do_initrd_path = False
3972

    
3973
    # boot order verification
3974
    if self.hvm_boot_order is not None:
3975
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
3976
        if len(self.hvm_boot_order.strip("acdn")) != 0:
3977
          raise errors.OpPrereqError("invalid boot order specified,"
3978
                                     " must be one or more of [acdn]"
3979
                                     " or 'default'")
3980

    
3981
    instance = self.cfg.GetInstanceInfo(
3982
      self.cfg.ExpandInstanceName(self.op.instance_name))
3983
    if instance is None:
3984
      raise errors.OpPrereqError("No such instance name '%s'" %
3985
                                 self.op.instance_name)
3986
    self.op.instance_name = instance.name
3987
    self.instance = instance
3988
    return
3989

    
3990
  def Exec(self, feedback_fn):
3991
    """Modifies an instance.
3992

3993
    All parameters take effect only at the next restart of the instance.
3994
    """
3995
    result = []
3996
    instance = self.instance
3997
    if self.mem:
3998
      instance.memory = self.mem
3999
      result.append(("mem", self.mem))
4000
    if self.vcpus:
4001
      instance.vcpus = self.vcpus
4002
      result.append(("vcpus",  self.vcpus))
4003
    if self.do_ip:
4004
      instance.nics[0].ip = self.ip
4005
      result.append(("ip", self.ip))
4006
    if self.bridge:
4007
      instance.nics[0].bridge = self.bridge
4008
      result.append(("bridge", self.bridge))
4009
    if self.mac:
4010
      instance.nics[0].mac = self.mac
4011
      result.append(("mac", self.mac))
4012
    if self.do_kernel_path:
4013
      instance.kernel_path = self.kernel_path
4014
      result.append(("kernel_path", self.kernel_path))
4015
    if self.do_initrd_path:
4016
      instance.initrd_path = self.initrd_path
4017
      result.append(("initrd_path", self.initrd_path))
4018
    if self.hvm_boot_order:
4019
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4020
        instance.hvm_boot_order = None
4021
      else:
4022
        instance.hvm_boot_order = self.hvm_boot_order
4023
      result.append(("hvm_boot_order", self.hvm_boot_order))
4024

    
4025
    self.cfg.AddInstance(instance)
4026

    
4027
    return result
4028

    
4029

    
4030
class LUQueryExports(NoHooksLU):
4031
  """Query the exports list
4032

4033
  """
4034
  _OP_REQP = []
4035

    
4036
  def CheckPrereq(self):
4037
    """Check that the nodelist contains only existing nodes.
4038

4039
    """
4040
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4041

    
4042
  def Exec(self, feedback_fn):
4043
    """Compute the list of all the exported system images.
4044

4045
    Returns:
4046
      a dictionary with the structure node->(export-list)
4047
      where export-list is a list of the instances exported on
4048
      that node.
4049

4050
    """
4051
    return rpc.call_export_list(self.nodes)
4052

    
4053

    
4054
class LUExportInstance(LogicalUnit):
4055
  """Export an instance to an image in the cluster.
4056

4057
  """
4058
  HPATH = "instance-export"
4059
  HTYPE = constants.HTYPE_INSTANCE
4060
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4061

    
4062
  def BuildHooksEnv(self):
4063
    """Build hooks env.
4064

4065
    This will run on the master, primary node and target node.
4066

4067
    """
4068
    env = {
4069
      "EXPORT_NODE": self.op.target_node,
4070
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4071
      }
4072
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4073
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4074
          self.op.target_node]
4075
    return env, nl, nl
4076

    
4077
  def CheckPrereq(self):
4078
    """Check prerequisites.
4079

4080
    This checks that the instance name is a valid one.
4081

4082
    """
4083
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4084
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4085
    if self.instance is None:
4086
      raise errors.OpPrereqError("Instance '%s' not found" %
4087
                                 self.op.instance_name)
4088

    
4089
    # node verification
4090
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4091
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4092

    
4093
    if self.dst_node is None:
4094
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4095
                                 self.op.target_node)
4096
    self.op.target_node = self.dst_node.name
4097

    
4098
  def Exec(self, feedback_fn):
4099
    """Export an instance to an image in the cluster.
4100

4101
    """
4102
    instance = self.instance
4103
    dst_node = self.dst_node
4104
    src_node = instance.primary_node
4105
    # shutdown the instance, unless requested not to do so
4106
    if self.op.shutdown:
4107
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4108
      self.proc.ChainOpCode(op)
4109

    
4110
    vgname = self.cfg.GetVGName()
4111

    
4112
    snap_disks = []
4113

    
4114
    try:
4115
      for disk in instance.disks:
4116
        if disk.iv_name == "sda":
4117
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4118
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4119

    
4120
          if not new_dev_name:
4121
            logger.Error("could not snapshot block device %s on node %s" %
4122
                         (disk.logical_id[1], src_node))
4123
          else:
4124
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4125
                                      logical_id=(vgname, new_dev_name),
4126
                                      physical_id=(vgname, new_dev_name),
4127
                                      iv_name=disk.iv_name)
4128
            snap_disks.append(new_dev)
4129

    
4130
    finally:
4131
      if self.op.shutdown:
4132
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4133
                                       force=False)
4134
        self.proc.ChainOpCode(op)
4135

    
4136
    # TODO: check for size
4137

    
4138
    for dev in snap_disks:
4139
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4140
                                           instance):
4141
        logger.Error("could not export block device %s from node"
4142
                     " %s to node %s" %
4143
                     (dev.logical_id[1], src_node, dst_node.name))
4144
      if not rpc.call_blockdev_remove(src_node, dev):
4145
        logger.Error("could not remove snapshot block device %s from"
4146
                     " node %s" % (dev.logical_id[1], src_node))
4147

    
4148
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4149
      logger.Error("could not finalize export for instance %s on node %s" %
4150
                   (instance.name, dst_node.name))
4151

    
4152
    nodelist = self.cfg.GetNodeList()
4153
    nodelist.remove(dst_node.name)
4154

    
4155
    # on one-node clusters nodelist will be empty after the removal
4156
    # if we proceed the backup would be removed because OpQueryExports
4157
    # substitutes an empty list with the full cluster node list.
4158
    if nodelist:
4159
      op = opcodes.OpQueryExports(nodes=nodelist)
4160
      exportlist = self.proc.ChainOpCode(op)
4161
      for node in exportlist:
4162
        if instance.name in exportlist[node]:
4163
          if not rpc.call_export_remove(node, instance.name):
4164
            logger.Error("could not remove older export for instance %s"
4165
                         " on node %s" % (instance.name, node))
4166

    
4167

    
4168
class TagsLU(NoHooksLU):
4169
  """Generic tags LU.
4170

4171
  This is an abstract class which is the parent of all the other tags LUs.
4172

4173
  """
4174
  def CheckPrereq(self):
4175
    """Check prerequisites.
4176

4177
    """
4178
    if self.op.kind == constants.TAG_CLUSTER:
4179
      self.target = self.cfg.GetClusterInfo()
4180
    elif self.op.kind == constants.TAG_NODE:
4181
      name = self.cfg.ExpandNodeName(self.op.name)
4182
      if name is None:
4183
        raise errors.OpPrereqError("Invalid node name (%s)" %
4184
                                   (self.op.name,))
4185
      self.op.name = name
4186
      self.target = self.cfg.GetNodeInfo(name)
4187
    elif self.op.kind == constants.TAG_INSTANCE:
4188
      name = self.cfg.ExpandInstanceName(self.op.name)
4189
      if name is None:
4190
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4191
                                   (self.op.name,))
4192
      self.op.name = name
4193
      self.target = self.cfg.GetInstanceInfo(name)
4194
    else:
4195
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4196
                                 str(self.op.kind))
4197

    
4198

    
4199
class LUGetTags(TagsLU):
4200
  """Returns the tags of a given object.
4201

4202
  """
4203
  _OP_REQP = ["kind", "name"]
4204

    
4205
  def Exec(self, feedback_fn):
4206
    """Returns the tag list.
4207

4208
    """
4209
    return self.target.GetTags()
4210

    
4211

    
4212
class LUSearchTags(NoHooksLU):
4213
  """Searches the tags for a given pattern.
4214

4215
  """
4216
  _OP_REQP = ["pattern"]
4217

    
4218
  def CheckPrereq(self):
4219
    """Check prerequisites.
4220

4221
    This checks the pattern passed for validity by compiling it.
4222

4223
    """
4224
    try:
4225
      self.re = re.compile(self.op.pattern)
4226
    except re.error, err:
4227
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4228
                                 (self.op.pattern, err))
4229

    
4230
  def Exec(self, feedback_fn):
4231
    """Returns the tag list.
4232

4233
    """
4234
    cfg = self.cfg
4235
    tgts = [("/cluster", cfg.GetClusterInfo())]
4236
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4237
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4238
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4239
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4240
    results = []
4241
    for path, target in tgts:
4242
      for tag in target.GetTags():
4243
        if self.re.search(tag):
4244
          results.append((path, tag))
4245
    return results
4246

    
4247

    
4248
class LUAddTags(TagsLU):
4249
  """Sets a tag on a given object.
4250

4251
  """
4252
  _OP_REQP = ["kind", "name", "tags"]
4253

    
4254
  def CheckPrereq(self):
4255
    """Check prerequisites.
4256

4257
    This checks the type and length of the tag name and value.
4258

4259
    """
4260
    TagsLU.CheckPrereq(self)
4261
    for tag in self.op.tags:
4262
      objects.TaggableObject.ValidateTag(tag)
4263

    
4264
  def Exec(self, feedback_fn):
4265
    """Sets the tag.
4266

4267
    """
4268
    try:
4269
      for tag in self.op.tags:
4270
        self.target.AddTag(tag)
4271
    except errors.TagError, err:
4272
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4273
    try:
4274
      self.cfg.Update(self.target)
4275
    except errors.ConfigurationError:
4276
      raise errors.OpRetryError("There has been a modification to the"
4277
                                " config file and the operation has been"
4278
                                " aborted. Please retry.")
4279

    
4280

    
4281
class LUDelTags(TagsLU):
4282
  """Delete a list of tags from a given object.
4283

4284
  """
4285
  _OP_REQP = ["kind", "name", "tags"]
4286

    
4287
  def CheckPrereq(self):
4288
    """Check prerequisites.
4289

4290
    This checks that we have the given tag.
4291

4292
    """
4293
    TagsLU.CheckPrereq(self)
4294
    for tag in self.op.tags:
4295
      objects.TaggableObject.ValidateTag(tag)
4296
    del_tags = frozenset(self.op.tags)
4297
    cur_tags = self.target.GetTags()
4298
    if not del_tags <= cur_tags:
4299
      diff_tags = del_tags - cur_tags
4300
      diff_names = ["'%s'" % tag for tag in diff_tags]
4301
      diff_names.sort()
4302
      raise errors.OpPrereqError("Tag(s) %s not found" %
4303
                                 (",".join(diff_names)))
4304

    
4305
  def Exec(self, feedback_fn):
4306
    """Remove the tag from the object.
4307

4308
    """
4309
    for tag in self.op.tags:
4310
      self.target.RemoveTag(tag)
4311
    try:
4312
      self.cfg.Update(self.target)
4313
    except errors.ConfigurationError:
4314
      raise errors.OpRetryError("There has been a modification to the"
4315
                                " config file and the operation has been"
4316
                                " aborted. Please retry.")
4317

    
4318
class LUTestDelay(NoHooksLU):
4319
  """Sleep for a specified amount of time.
4320

4321
  This LU sleeps on the master and/or nodes for a specified amoutn of
4322
  time.
4323

4324
  """
4325
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4326

    
4327
  def CheckPrereq(self):
4328
    """Check prerequisites.
4329

4330
    This checks that we have a good list of nodes and/or the duration
4331
    is valid.
4332

4333
    """
4334

    
4335
    if self.op.on_nodes:
4336
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4337

    
4338
  def Exec(self, feedback_fn):
4339
    """Do the actual sleep.
4340

4341
    """
4342
    if self.op.on_master:
4343
      if not utils.TestDelay(self.op.duration):
4344
        raise errors.OpExecError("Error during master delay test")
4345
    if self.op.on_nodes:
4346
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4347
      if not result:
4348
        raise errors.OpExecError("Complete failure from rpc call")
4349
      for node, node_result in result.items():
4350
        if not node_result:
4351
          raise errors.OpExecError("Failure during rpc call to node %s,"
4352
                                   " result: %s" % (node, node_result))