Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 871705db

History | View | Annotate | Download (152.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84
    if self.REQ_CLUSTER:
85
      if not cfg.IsCluster():
86
        raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                   " use 'gnt-cluster init' first.")
88
      if self.REQ_MASTER:
89
        master = sstore.GetMasterNode()
90
        if master != utils.HostInfo().name:
91
          raise errors.OpPrereqError("Commands must be run on the master"
92
                                     " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    As for the node lists, the master should not be included in the
148
    them, as it will be added by the hooks runner in case this LU
149
    requires a cluster to run on (otherwise we don't have a node
150
    list). No nodes should be returned as an empty list (and not
151
    None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159

    
160
class NoHooksLU(LogicalUnit):
161
  """Simple LU which runs no hooks.
162

163
  This LU is intended as a parent for other LogicalUnits which will
164
  run no hooks, in order to reduce duplicate code.
165

166
  """
167
  HPATH = None
168
  HTYPE = None
169

    
170
  def BuildHooksEnv(self):
171
    """Build hooks env.
172

173
    This is a no-op, since we don't run hooks.
174

175
    """
176
    return {}, [], []
177

    
178

    
179
def _AddHostToEtcHosts(hostname):
180
  """Wrapper around utils.SetEtcHostsEntry.
181

182
  """
183
  hi = utils.HostInfo(name=hostname)
184
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185

    
186

    
187
def _RemoveHostFromEtcHosts(hostname):
188
  """Wrapper around utils.RemoveEtcHostsEntry.
189

190
  """
191
  hi = utils.HostInfo(name=hostname)
192
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194

    
195

    
196
def _GetWantedNodes(lu, nodes):
197
  """Returns list of checked and expanded node names.
198

199
  Args:
200
    nodes: List of nodes (strings) or None for all
201

202
  """
203
  if not isinstance(nodes, list):
204
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
205

    
206
  if nodes:
207
    wanted = []
208

    
209
    for name in nodes:
210
      node = lu.cfg.ExpandNodeName(name)
211
      if node is None:
212
        raise errors.OpPrereqError("No such node name '%s'" % name)
213
      wanted.append(node)
214

    
215
  else:
216
    wanted = lu.cfg.GetNodeList()
217
  return utils.NiceSort(wanted)
218

    
219

    
220
def _GetWantedInstances(lu, instances):
221
  """Returns list of checked and expanded instance names.
222

223
  Args:
224
    instances: List of instances (strings) or None for all
225

226
  """
227
  if not isinstance(instances, list):
228
    raise errors.OpPrereqError("Invalid argument type 'instances'")
229

    
230
  if instances:
231
    wanted = []
232

    
233
    for name in instances:
234
      instance = lu.cfg.ExpandInstanceName(name)
235
      if instance is None:
236
        raise errors.OpPrereqError("No such instance name '%s'" % name)
237
      wanted.append(instance)
238

    
239
  else:
240
    wanted = lu.cfg.GetInstanceList()
241
  return utils.NiceSort(wanted)
242

    
243

    
244
def _CheckOutputFields(static, dynamic, selected):
245
  """Checks whether all selected fields are valid.
246

247
  Args:
248
    static: Static fields
249
    dynamic: Dynamic fields
250

251
  """
252
  static_fields = frozenset(static)
253
  dynamic_fields = frozenset(dynamic)
254

    
255
  all_fields = static_fields | dynamic_fields
256

    
257
  if not all_fields.issuperset(selected):
258
    raise errors.OpPrereqError("Unknown output fields selected: %s"
259
                               % ",".join(frozenset(selected).
260
                                          difference(all_fields)))
261

    
262

    
263
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264
                          memory, vcpus, nics):
265
  """Builds instance related env variables for hooks from single variables.
266

267
  Args:
268
    secondary_nodes: List of secondary nodes as strings
269
  """
270
  env = {
271
    "OP_TARGET": name,
272
    "INSTANCE_NAME": name,
273
    "INSTANCE_PRIMARY": primary_node,
274
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275
    "INSTANCE_OS_TYPE": os_type,
276
    "INSTANCE_STATUS": status,
277
    "INSTANCE_MEMORY": memory,
278
    "INSTANCE_VCPUS": vcpus,
279
  }
280

    
281
  if nics:
282
    nic_count = len(nics)
283
    for idx, (ip, bridge, mac) in enumerate(nics):
284
      if ip is None:
285
        ip = ""
286
      env["INSTANCE_NIC%d_IP" % idx] = ip
287
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289
  else:
290
    nic_count = 0
291

    
292
  env["INSTANCE_NIC_COUNT"] = nic_count
293

    
294
  return env
295

    
296

    
297
def _BuildInstanceHookEnvByObject(instance, override=None):
298
  """Builds instance related env variables for hooks from an object.
299

300
  Args:
301
    instance: objects.Instance object of instance
302
    override: dict of values to override
303
  """
304
  args = {
305
    'name': instance.name,
306
    'primary_node': instance.primary_node,
307
    'secondary_nodes': instance.secondary_nodes,
308
    'os_type': instance.os,
309
    'status': instance.os,
310
    'memory': instance.memory,
311
    'vcpus': instance.vcpus,
312
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313
  }
314
  if override:
315
    args.update(override)
316
  return _BuildInstanceHookEnv(**args)
317

    
318

    
319
def _HasValidVG(vglist, vgname):
320
  """Checks if the volume group list is valid.
321

322
  A non-None return value means there's an error, and the return value
323
  is the error message.
324

325
  """
326
  vgsize = vglist.get(vgname, None)
327
  if vgsize is None:
328
    return "volume group '%s' missing" % vgname
329
  elif vgsize < 20480:
330
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
331
            (vgname, vgsize))
332
  return None
333

    
334

    
335
def _InitSSHSetup(node):
336
  """Setup the SSH configuration for the cluster.
337

338

339
  This generates a dsa keypair for root, adds the pub key to the
340
  permitted hosts and adds the hostkey to its own known hosts.
341

342
  Args:
343
    node: the name of this host as a fqdn
344

345
  """
346
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
347

    
348
  for name in priv_key, pub_key:
349
    if os.path.exists(name):
350
      utils.CreateBackup(name)
351
    utils.RemoveFile(name)
352

    
353
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
354
                         "-f", priv_key,
355
                         "-q", "-N", ""])
356
  if result.failed:
357
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
358
                             result.output)
359

    
360
  f = open(pub_key, 'r')
361
  try:
362
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
363
  finally:
364
    f.close()
365

    
366

    
367
def _InitGanetiServerSetup(ss):
368
  """Setup the necessary configuration for the initial node daemon.
369

370
  This creates the nodepass file containing the shared password for
371
  the cluster and also generates the SSL certificate.
372

373
  """
374
  # Create pseudo random password
375
  randpass = sha.new(os.urandom(64)).hexdigest()
376
  # and write it into sstore
377
  ss.SetKey(ss.SS_NODED_PASS, randpass)
378

    
379
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380
                         "-days", str(365*5), "-nodes", "-x509",
381
                         "-keyout", constants.SSL_CERT_FILE,
382
                         "-out", constants.SSL_CERT_FILE, "-batch"])
383
  if result.failed:
384
    raise errors.OpExecError("could not generate server ssl cert, command"
385
                             " %s had exitcode %s and error message %s" %
386
                             (result.cmd, result.exit_code, result.output))
387

    
388
  os.chmod(constants.SSL_CERT_FILE, 0400)
389

    
390
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
391

    
392
  if result.failed:
393
    raise errors.OpExecError("Could not start the node daemon, command %s"
394
                             " had exitcode %s and error %s" %
395
                             (result.cmd, result.exit_code, result.output))
396

    
397

    
398
def _CheckInstanceBridgesExist(instance):
399
  """Check that the brigdes needed by an instance exist.
400

401
  """
402
  # check bridges existance
403
  brlist = [nic.bridge for nic in instance.nics]
404
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
405
    raise errors.OpPrereqError("one or more target bridges %s does not"
406
                               " exist on destination node '%s'" %
407
                               (brlist, instance.primary_node))
408

    
409

    
410
class LUInitCluster(LogicalUnit):
411
  """Initialise the cluster.
412

413
  """
414
  HPATH = "cluster-init"
415
  HTYPE = constants.HTYPE_CLUSTER
416
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
417
              "def_bridge", "master_netdev", "file_storage_dir"]
418
  REQ_CLUSTER = False
419

    
420
  def BuildHooksEnv(self):
421
    """Build hooks env.
422

423
    Notes: Since we don't require a cluster, we must manually add
424
    ourselves in the post-run node list.
425

426
    """
427
    env = {"OP_TARGET": self.op.cluster_name}
428
    return env, [], [self.hostname.name]
429

    
430
  def CheckPrereq(self):
431
    """Verify that the passed name is a valid one.
432

433
    """
434
    if config.ConfigWriter.IsCluster():
435
      raise errors.OpPrereqError("Cluster is already initialised")
436

    
437
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
439
        raise errors.OpPrereqError("Please prepare the cluster VNC"
440
                                   "password file %s" %
441
                                   constants.VNC_PASSWORD_FILE)
442

    
443
    self.hostname = hostname = utils.HostInfo()
444

    
445
    if hostname.ip.startswith("127."):
446
      raise errors.OpPrereqError("This host's IP resolves to the private"
447
                                 " range (%s). Please fix DNS or %s." %
448
                                 (hostname.ip, constants.ETC_HOSTS))
449

    
450
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451
                         source=constants.LOCALHOST_IP_ADDRESS):
452
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453
                                 " to %s,\nbut this ip address does not"
454
                                 " belong to this host."
455
                                 " Aborting." % hostname.ip)
456

    
457
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
458

    
459
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
460
                     timeout=5):
461
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
462

    
463
    secondary_ip = getattr(self.op, "secondary_ip", None)
464
    if secondary_ip and not utils.IsValidIP(secondary_ip):
465
      raise errors.OpPrereqError("Invalid secondary ip given")
466
    if (secondary_ip and
467
        secondary_ip != hostname.ip and
468
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469
                           source=constants.LOCALHOST_IP_ADDRESS))):
470
      raise errors.OpPrereqError("You gave %s as secondary IP,"
471
                                 " but it does not belong to this host." %
472
                                 secondary_ip)
473
    self.secondary_ip = secondary_ip
474

    
475
    # checks presence of the volume group given
476
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
477

    
478
    if vgstatus:
479
      raise errors.OpPrereqError("Error: %s" % vgstatus)
480

    
481
    if not os.path.isabs(self.op.file_storage_dir):
482
      raise errors.OpPrereqError("The file storage directory you have is"
483
                                 " not an absolute path.")
484

    
485
    if not os.path.exists(self.op.file_storage_dir):
486
      raise errors.OpPrereqError("Default file storage directory '%s' does "
487
                                 "not exist. Please create." %
488
                                 self.op.file_storage_dir)
489

    
490
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
491
                    self.op.mac_prefix):
492
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
493
                                 self.op.mac_prefix)
494

    
495
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
496
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
497
                                 self.op.hypervisor_type)
498

    
499
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
500
    if result.failed:
501
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
502
                                 (self.op.master_netdev,
503
                                  result.output.strip()))
504

    
505
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
506
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
507
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
508
                                 " executable." % constants.NODE_INITD_SCRIPT)
509

    
510
  def Exec(self, feedback_fn):
511
    """Initialize the cluster.
512

513
    """
514
    clustername = self.clustername
515
    hostname = self.hostname
516

    
517
    # set up the simple store
518
    self.sstore = ss = ssconf.SimpleStore()
519
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
520
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
521
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
522
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
523
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
524
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
525

    
526
    # set up the inter-node password and certificate
527
    _InitGanetiServerSetup(ss)
528

    
529
    # start the master ip
530
    rpc.call_node_start_master(hostname.name)
531

    
532
    # set up ssh config and /etc/hosts
533
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
534
    try:
535
      sshline = f.read()
536
    finally:
537
      f.close()
538
    sshkey = sshline.split(" ")[1]
539

    
540
    _AddHostToEtcHosts(hostname.name)
541
    _InitSSHSetup(hostname.name)
542

    
543
    # init of cluster config file
544
    self.cfg = cfgw = config.ConfigWriter()
545
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
546
                    sshkey, self.op.mac_prefix,
547
                    self.op.vg_name, self.op.def_bridge)
548

    
549
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
550

    
551

    
552
class LUDestroyCluster(NoHooksLU):
553
  """Logical unit for destroying the cluster.
554

555
  """
556
  _OP_REQP = []
557

    
558
  def CheckPrereq(self):
559
    """Check prerequisites.
560

561
    This checks whether the cluster is empty.
562

563
    Any errors are signalled by raising errors.OpPrereqError.
564

565
    """
566
    master = self.sstore.GetMasterNode()
567

    
568
    nodelist = self.cfg.GetNodeList()
569
    if len(nodelist) != 1 or nodelist[0] != master:
570
      raise errors.OpPrereqError("There are still %d node(s) in"
571
                                 " this cluster." % (len(nodelist) - 1))
572
    instancelist = self.cfg.GetInstanceList()
573
    if instancelist:
574
      raise errors.OpPrereqError("There are still %d instance(s) in"
575
                                 " this cluster." % len(instancelist))
576

    
577
  def Exec(self, feedback_fn):
578
    """Destroys the cluster.
579

580
    """
581
    master = self.sstore.GetMasterNode()
582
    if not rpc.call_node_stop_master(master):
583
      raise errors.OpExecError("Could not disable the master role")
584
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
585
    utils.CreateBackup(priv_key)
586
    utils.CreateBackup(pub_key)
587
    rpc.call_node_leave_cluster(master)
588

    
589

    
590
class LUVerifyCluster(NoHooksLU):
591
  """Verifies the cluster status.
592

593
  """
594
  _OP_REQP = []
595

    
596
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
597
                  remote_version, feedback_fn):
598
    """Run multiple tests against a node.
599

600
    Test list:
601
      - compares ganeti version
602
      - checks vg existance and size > 20G
603
      - checks config file checksum
604
      - checks ssh to other nodes
605

606
    Args:
607
      node: name of the node to check
608
      file_list: required list of files
609
      local_cksum: dictionary of local files and their checksums
610

611
    """
612
    # compares ganeti version
613
    local_version = constants.PROTOCOL_VERSION
614
    if not remote_version:
615
      feedback_fn(" - ERROR: connection to %s failed" % (node))
616
      return True
617

    
618
    if local_version != remote_version:
619
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
620
                      (local_version, node, remote_version))
621
      return True
622

    
623
    # checks vg existance and size > 20G
624

    
625
    bad = False
626
    if not vglist:
627
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
628
                      (node,))
629
      bad = True
630
    else:
631
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
632
      if vgstatus:
633
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
634
        bad = True
635

    
636
    # checks config file checksum
637
    # checks ssh to any
638

    
639
    if 'filelist' not in node_result:
640
      bad = True
641
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
642
    else:
643
      remote_cksum = node_result['filelist']
644
      for file_name in file_list:
645
        if file_name not in remote_cksum:
646
          bad = True
647
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
648
        elif remote_cksum[file_name] != local_cksum[file_name]:
649
          bad = True
650
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
651

    
652
    if 'nodelist' not in node_result:
653
      bad = True
654
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
655
    else:
656
      if node_result['nodelist']:
657
        bad = True
658
        for node in node_result['nodelist']:
659
          feedback_fn("  - ERROR: communication with node '%s': %s" %
660
                          (node, node_result['nodelist'][node]))
661
    hyp_result = node_result.get('hypervisor', None)
662
    if hyp_result is not None:
663
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
664
    return bad
665

    
666
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
667
    """Verify an instance.
668

669
    This function checks to see if the required block devices are
670
    available on the instance's node.
671

672
    """
673
    bad = False
674

    
675
    instancelist = self.cfg.GetInstanceList()
676
    if not instance in instancelist:
677
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
678
                      (instance, instancelist))
679
      bad = True
680

    
681
    instanceconfig = self.cfg.GetInstanceInfo(instance)
682
    node_current = instanceconfig.primary_node
683

    
684
    node_vol_should = {}
685
    instanceconfig.MapLVsByNode(node_vol_should)
686

    
687
    for node in node_vol_should:
688
      for volume in node_vol_should[node]:
689
        if node not in node_vol_is or volume not in node_vol_is[node]:
690
          feedback_fn("  - ERROR: volume %s missing on node %s" %
691
                          (volume, node))
692
          bad = True
693

    
694
    if not instanceconfig.status == 'down':
695
      if not instance in node_instance[node_current]:
696
        feedback_fn("  - ERROR: instance %s not running on node %s" %
697
                        (instance, node_current))
698
        bad = True
699

    
700
    for node in node_instance:
701
      if (not node == node_current):
702
        if instance in node_instance[node]:
703
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
704
                          (instance, node))
705
          bad = True
706

    
707
    return bad
708

    
709
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
710
    """Verify if there are any unknown volumes in the cluster.
711

712
    The .os, .swap and backup volumes are ignored. All other volumes are
713
    reported as unknown.
714

715
    """
716
    bad = False
717

    
718
    for node in node_vol_is:
719
      for volume in node_vol_is[node]:
720
        if node not in node_vol_should or volume not in node_vol_should[node]:
721
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
722
                      (volume, node))
723
          bad = True
724
    return bad
725

    
726
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
727
    """Verify the list of running instances.
728

729
    This checks what instances are running but unknown to the cluster.
730

731
    """
732
    bad = False
733
    for node in node_instance:
734
      for runninginstance in node_instance[node]:
735
        if runninginstance not in instancelist:
736
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
737
                          (runninginstance, node))
738
          bad = True
739
    return bad
740

    
741
  def CheckPrereq(self):
742
    """Check prerequisites.
743

744
    This has no prerequisites.
745

746
    """
747
    pass
748

    
749
  def Exec(self, feedback_fn):
750
    """Verify integrity of cluster, performing various test on nodes.
751

752
    """
753
    bad = False
754
    feedback_fn("* Verifying global settings")
755
    for msg in self.cfg.VerifyConfig():
756
      feedback_fn("  - ERROR: %s" % msg)
757

    
758
    vg_name = self.cfg.GetVGName()
759
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
760
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
761
    node_volume = {}
762
    node_instance = {}
763

    
764
    # FIXME: verify OS list
765
    # do local checksums
766
    file_names = list(self.sstore.GetFileList())
767
    file_names.append(constants.SSL_CERT_FILE)
768
    file_names.append(constants.CLUSTER_CONF_FILE)
769
    local_checksums = utils.FingerprintFiles(file_names)
770

    
771
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
772
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
773
    all_instanceinfo = rpc.call_instance_list(nodelist)
774
    all_vglist = rpc.call_vg_list(nodelist)
775
    node_verify_param = {
776
      'filelist': file_names,
777
      'nodelist': nodelist,
778
      'hypervisor': None,
779
      }
780
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
781
    all_rversion = rpc.call_version(nodelist)
782

    
783
    for node in nodelist:
784
      feedback_fn("* Verifying node %s" % node)
785
      result = self._VerifyNode(node, file_names, local_checksums,
786
                                all_vglist[node], all_nvinfo[node],
787
                                all_rversion[node], feedback_fn)
788
      bad = bad or result
789

    
790
      # node_volume
791
      volumeinfo = all_volumeinfo[node]
792

    
793
      if isinstance(volumeinfo, basestring):
794
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
795
                    (node, volumeinfo[-400:].encode('string_escape')))
796
        bad = True
797
        node_volume[node] = {}
798
      elif not isinstance(volumeinfo, dict):
799
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
800
        bad = True
801
        continue
802
      else:
803
        node_volume[node] = volumeinfo
804

    
805
      # node_instance
806
      nodeinstance = all_instanceinfo[node]
807
      if type(nodeinstance) != list:
808
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
809
        bad = True
810
        continue
811

    
812
      node_instance[node] = nodeinstance
813

    
814
    node_vol_should = {}
815

    
816
    for instance in instancelist:
817
      feedback_fn("* Verifying instance %s" % instance)
818
      result =  self._VerifyInstance(instance, node_volume, node_instance,
819
                                     feedback_fn)
820
      bad = bad or result
821

    
822
      inst_config = self.cfg.GetInstanceInfo(instance)
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
    feedback_fn("* Verifying orphan volumes")
827
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
828
                                       feedback_fn)
829
    bad = bad or result
830

    
831
    feedback_fn("* Verifying remaining instances")
832
    result = self._VerifyOrphanInstances(instancelist, node_instance,
833
                                         feedback_fn)
834
    bad = bad or result
835

    
836
    return int(bad)
837

    
838

    
839
class LUVerifyDisks(NoHooksLU):
840
  """Verifies the cluster disks status.
841

842
  """
843
  _OP_REQP = []
844

    
845
  def CheckPrereq(self):
846
    """Check prerequisites.
847

848
    This has no prerequisites.
849

850
    """
851
    pass
852

    
853
  def Exec(self, feedback_fn):
854
    """Verify integrity of cluster disks.
855

856
    """
857
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
858

    
859
    vg_name = self.cfg.GetVGName()
860
    nodes = utils.NiceSort(self.cfg.GetNodeList())
861
    instances = [self.cfg.GetInstanceInfo(name)
862
                 for name in self.cfg.GetInstanceList()]
863

    
864
    nv_dict = {}
865
    for inst in instances:
866
      inst_lvs = {}
867
      if (inst.status != "up" or
868
          inst.disk_template not in constants.DTS_NET_MIRROR):
869
        continue
870
      inst.MapLVsByNode(inst_lvs)
871
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
872
      for node, vol_list in inst_lvs.iteritems():
873
        for vol in vol_list:
874
          nv_dict[(node, vol)] = inst
875

    
876
    if not nv_dict:
877
      return result
878

    
879
    node_lvs = rpc.call_volume_list(nodes, vg_name)
880

    
881
    to_act = set()
882
    for node in nodes:
883
      # node_volume
884
      lvs = node_lvs[node]
885

    
886
      if isinstance(lvs, basestring):
887
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
888
        res_nlvm[node] = lvs
889
      elif not isinstance(lvs, dict):
890
        logger.Info("connection to node %s failed or invalid data returned" %
891
                    (node,))
892
        res_nodes.append(node)
893
        continue
894

    
895
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
896
        inst = nv_dict.pop((node, lv_name), None)
897
        if (not lv_online and inst is not None
898
            and inst.name not in res_instances):
899
          res_instances.append(inst.name)
900

    
901
    # any leftover items in nv_dict are missing LVs, let's arrange the
902
    # data better
903
    for key, inst in nv_dict.iteritems():
904
      if inst.name not in res_missing:
905
        res_missing[inst.name] = []
906
      res_missing[inst.name].append(key)
907

    
908
    return result
909

    
910

    
911
class LURenameCluster(LogicalUnit):
912
  """Rename the cluster.
913

914
  """
915
  HPATH = "cluster-rename"
916
  HTYPE = constants.HTYPE_CLUSTER
917
  _OP_REQP = ["name"]
918

    
919
  def BuildHooksEnv(self):
920
    """Build hooks env.
921

922
    """
923
    env = {
924
      "OP_TARGET": self.sstore.GetClusterName(),
925
      "NEW_NAME": self.op.name,
926
      }
927
    mn = self.sstore.GetMasterNode()
928
    return env, [mn], [mn]
929

    
930
  def CheckPrereq(self):
931
    """Verify that the passed name is a valid one.
932

933
    """
934
    hostname = utils.HostInfo(self.op.name)
935

    
936
    new_name = hostname.name
937
    self.ip = new_ip = hostname.ip
938
    old_name = self.sstore.GetClusterName()
939
    old_ip = self.sstore.GetMasterIP()
940
    if new_name == old_name and new_ip == old_ip:
941
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
942
                                 " cluster has changed")
943
    if new_ip != old_ip:
944
      result = utils.RunCmd(["fping", "-q", new_ip])
945
      if not result.failed:
946
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
947
                                   " reachable on the network. Aborting." %
948
                                   new_ip)
949

    
950
    self.op.name = new_name
951

    
952
  def Exec(self, feedback_fn):
953
    """Rename the cluster.
954

955
    """
956
    clustername = self.op.name
957
    ip = self.ip
958
    ss = self.sstore
959

    
960
    # shutdown the master IP
961
    master = ss.GetMasterNode()
962
    if not rpc.call_node_stop_master(master):
963
      raise errors.OpExecError("Could not disable the master role")
964

    
965
    try:
966
      # modify the sstore
967
      ss.SetKey(ss.SS_MASTER_IP, ip)
968
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
969

    
970
      # Distribute updated ss config to all nodes
971
      myself = self.cfg.GetNodeInfo(master)
972
      dist_nodes = self.cfg.GetNodeList()
973
      if myself.name in dist_nodes:
974
        dist_nodes.remove(myself.name)
975

    
976
      logger.Debug("Copying updated ssconf data to all nodes")
977
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
978
        fname = ss.KeyToFilename(keyname)
979
        result = rpc.call_upload_file(dist_nodes, fname)
980
        for to_node in dist_nodes:
981
          if not result[to_node]:
982
            logger.Error("copy of file %s to node %s failed" %
983
                         (fname, to_node))
984
    finally:
985
      if not rpc.call_node_start_master(master):
986
        logger.Error("Could not re-enable the master role on the master,"
987
                     " please restart manually.")
988

    
989

    
990
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
991
  """Sleep and poll for an instance's disk to sync.
992

993
  """
994
  if not instance.disks:
995
    return True
996

    
997
  if not oneshot:
998
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
999

    
1000
  node = instance.primary_node
1001

    
1002
  for dev in instance.disks:
1003
    cfgw.SetDiskID(dev, node)
1004

    
1005
  retries = 0
1006
  while True:
1007
    max_time = 0
1008
    done = True
1009
    cumul_degraded = False
1010
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1011
    if not rstats:
1012
      proc.LogWarning("Can't get any data from node %s" % node)
1013
      retries += 1
1014
      if retries >= 10:
1015
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1016
                                 " aborting." % node)
1017
      time.sleep(6)
1018
      continue
1019
    retries = 0
1020
    for i in range(len(rstats)):
1021
      mstat = rstats[i]
1022
      if mstat is None:
1023
        proc.LogWarning("Can't compute data for node %s/%s" %
1024
                        (node, instance.disks[i].iv_name))
1025
        continue
1026
      # we ignore the ldisk parameter
1027
      perc_done, est_time, is_degraded, _ = mstat
1028
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1029
      if perc_done is not None:
1030
        done = False
1031
        if est_time is not None:
1032
          rem_time = "%d estimated seconds remaining" % est_time
1033
          max_time = est_time
1034
        else:
1035
          rem_time = "no time estimate"
1036
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1037
                     (instance.disks[i].iv_name, perc_done, rem_time))
1038
    if done or oneshot:
1039
      break
1040

    
1041
    if unlock:
1042
      utils.Unlock('cmd')
1043
    try:
1044
      time.sleep(min(60, max_time))
1045
    finally:
1046
      if unlock:
1047
        utils.Lock('cmd')
1048

    
1049
  if done:
1050
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1051
  return not cumul_degraded
1052

    
1053

    
1054
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1055
  """Check that mirrors are not degraded.
1056

1057
  The ldisk parameter, if True, will change the test from the
1058
  is_degraded attribute (which represents overall non-ok status for
1059
  the device(s)) to the ldisk (representing the local storage status).
1060

1061
  """
1062
  cfgw.SetDiskID(dev, node)
1063
  if ldisk:
1064
    idx = 6
1065
  else:
1066
    idx = 5
1067

    
1068
  result = True
1069
  if on_primary or dev.AssembleOnSecondary():
1070
    rstats = rpc.call_blockdev_find(node, dev)
1071
    if not rstats:
1072
      logger.ToStderr("Can't get any data from node %s" % node)
1073
      result = False
1074
    else:
1075
      result = result and (not rstats[idx])
1076
  if dev.children:
1077
    for child in dev.children:
1078
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1079

    
1080
  return result
1081

    
1082

    
1083
class LUDiagnoseOS(NoHooksLU):
1084
  """Logical unit for OS diagnose/query.
1085

1086
  """
1087
  _OP_REQP = []
1088

    
1089
  def CheckPrereq(self):
1090
    """Check prerequisites.
1091

1092
    This always succeeds, since this is a pure query LU.
1093

1094
    """
1095
    return
1096

    
1097
  def Exec(self, feedback_fn):
1098
    """Compute the list of OSes.
1099

1100
    """
1101
    node_list = self.cfg.GetNodeList()
1102
    node_data = rpc.call_os_diagnose(node_list)
1103
    if node_data == False:
1104
      raise errors.OpExecError("Can't gather the list of OSes")
1105
    return node_data
1106

    
1107

    
1108
class LURemoveNode(LogicalUnit):
1109
  """Logical unit for removing a node.
1110

1111
  """
1112
  HPATH = "node-remove"
1113
  HTYPE = constants.HTYPE_NODE
1114
  _OP_REQP = ["node_name"]
1115

    
1116
  def BuildHooksEnv(self):
1117
    """Build hooks env.
1118

1119
    This doesn't run on the target node in the pre phase as a failed
1120
    node would not allows itself to run.
1121

1122
    """
1123
    env = {
1124
      "OP_TARGET": self.op.node_name,
1125
      "NODE_NAME": self.op.node_name,
1126
      }
1127
    all_nodes = self.cfg.GetNodeList()
1128
    all_nodes.remove(self.op.node_name)
1129
    return env, all_nodes, all_nodes
1130

    
1131
  def CheckPrereq(self):
1132
    """Check prerequisites.
1133

1134
    This checks:
1135
     - the node exists in the configuration
1136
     - it does not have primary or secondary instances
1137
     - it's not the master
1138

1139
    Any errors are signalled by raising errors.OpPrereqError.
1140

1141
    """
1142
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1143
    if node is None:
1144
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1145

    
1146
    instance_list = self.cfg.GetInstanceList()
1147

    
1148
    masternode = self.sstore.GetMasterNode()
1149
    if node.name == masternode:
1150
      raise errors.OpPrereqError("Node is the master node,"
1151
                                 " you need to failover first.")
1152

    
1153
    for instance_name in instance_list:
1154
      instance = self.cfg.GetInstanceInfo(instance_name)
1155
      if node.name == instance.primary_node:
1156
        raise errors.OpPrereqError("Instance %s still running on the node,"
1157
                                   " please remove first." % instance_name)
1158
      if node.name in instance.secondary_nodes:
1159
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1160
                                   " please remove first." % instance_name)
1161
    self.op.node_name = node.name
1162
    self.node = node
1163

    
1164
  def Exec(self, feedback_fn):
1165
    """Removes the node from the cluster.
1166

1167
    """
1168
    node = self.node
1169
    logger.Info("stopping the node daemon and removing configs from node %s" %
1170
                node.name)
1171

    
1172
    rpc.call_node_leave_cluster(node.name)
1173

    
1174
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1175

    
1176
    logger.Info("Removing node %s from config" % node.name)
1177

    
1178
    self.cfg.RemoveNode(node.name)
1179

    
1180
    _RemoveHostFromEtcHosts(node.name)
1181

    
1182

    
1183
class LUQueryNodes(NoHooksLU):
1184
  """Logical unit for querying nodes.
1185

1186
  """
1187
  _OP_REQP = ["output_fields", "names"]
1188

    
1189
  def CheckPrereq(self):
1190
    """Check prerequisites.
1191

1192
    This checks that the fields required are valid output fields.
1193

1194
    """
1195
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1196
                                     "mtotal", "mnode", "mfree",
1197
                                     "bootid"])
1198

    
1199
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1200
                               "pinst_list", "sinst_list",
1201
                               "pip", "sip"],
1202
                       dynamic=self.dynamic_fields,
1203
                       selected=self.op.output_fields)
1204

    
1205
    self.wanted = _GetWantedNodes(self, self.op.names)
1206

    
1207
  def Exec(self, feedback_fn):
1208
    """Computes the list of nodes and their attributes.
1209

1210
    """
1211
    nodenames = self.wanted
1212
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1213

    
1214
    # begin data gathering
1215

    
1216
    if self.dynamic_fields.intersection(self.op.output_fields):
1217
      live_data = {}
1218
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1219
      for name in nodenames:
1220
        nodeinfo = node_data.get(name, None)
1221
        if nodeinfo:
1222
          live_data[name] = {
1223
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1224
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1225
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1226
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1227
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1228
            "bootid": nodeinfo['bootid'],
1229
            }
1230
        else:
1231
          live_data[name] = {}
1232
    else:
1233
      live_data = dict.fromkeys(nodenames, {})
1234

    
1235
    node_to_primary = dict([(name, set()) for name in nodenames])
1236
    node_to_secondary = dict([(name, set()) for name in nodenames])
1237

    
1238
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1239
                             "sinst_cnt", "sinst_list"))
1240
    if inst_fields & frozenset(self.op.output_fields):
1241
      instancelist = self.cfg.GetInstanceList()
1242

    
1243
      for instance_name in instancelist:
1244
        inst = self.cfg.GetInstanceInfo(instance_name)
1245
        if inst.primary_node in node_to_primary:
1246
          node_to_primary[inst.primary_node].add(inst.name)
1247
        for secnode in inst.secondary_nodes:
1248
          if secnode in node_to_secondary:
1249
            node_to_secondary[secnode].add(inst.name)
1250

    
1251
    # end data gathering
1252

    
1253
    output = []
1254
    for node in nodelist:
1255
      node_output = []
1256
      for field in self.op.output_fields:
1257
        if field == "name":
1258
          val = node.name
1259
        elif field == "pinst_list":
1260
          val = list(node_to_primary[node.name])
1261
        elif field == "sinst_list":
1262
          val = list(node_to_secondary[node.name])
1263
        elif field == "pinst_cnt":
1264
          val = len(node_to_primary[node.name])
1265
        elif field == "sinst_cnt":
1266
          val = len(node_to_secondary[node.name])
1267
        elif field == "pip":
1268
          val = node.primary_ip
1269
        elif field == "sip":
1270
          val = node.secondary_ip
1271
        elif field in self.dynamic_fields:
1272
          val = live_data[node.name].get(field, None)
1273
        else:
1274
          raise errors.ParameterError(field)
1275
        node_output.append(val)
1276
      output.append(node_output)
1277

    
1278
    return output
1279

    
1280

    
1281
class LUQueryNodeVolumes(NoHooksLU):
1282
  """Logical unit for getting volumes on node(s).
1283

1284
  """
1285
  _OP_REQP = ["nodes", "output_fields"]
1286

    
1287
  def CheckPrereq(self):
1288
    """Check prerequisites.
1289

1290
    This checks that the fields required are valid output fields.
1291

1292
    """
1293
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1294

    
1295
    _CheckOutputFields(static=["node"],
1296
                       dynamic=["phys", "vg", "name", "size", "instance"],
1297
                       selected=self.op.output_fields)
1298

    
1299

    
1300
  def Exec(self, feedback_fn):
1301
    """Computes the list of nodes and their attributes.
1302

1303
    """
1304
    nodenames = self.nodes
1305
    volumes = rpc.call_node_volumes(nodenames)
1306

    
1307
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1308
             in self.cfg.GetInstanceList()]
1309

    
1310
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1311

    
1312
    output = []
1313
    for node in nodenames:
1314
      if node not in volumes or not volumes[node]:
1315
        continue
1316

    
1317
      node_vols = volumes[node][:]
1318
      node_vols.sort(key=lambda vol: vol['dev'])
1319

    
1320
      for vol in node_vols:
1321
        node_output = []
1322
        for field in self.op.output_fields:
1323
          if field == "node":
1324
            val = node
1325
          elif field == "phys":
1326
            val = vol['dev']
1327
          elif field == "vg":
1328
            val = vol['vg']
1329
          elif field == "name":
1330
            val = vol['name']
1331
          elif field == "size":
1332
            val = int(float(vol['size']))
1333
          elif field == "instance":
1334
            for inst in ilist:
1335
              if node not in lv_by_node[inst]:
1336
                continue
1337
              if vol['name'] in lv_by_node[inst][node]:
1338
                val = inst.name
1339
                break
1340
            else:
1341
              val = '-'
1342
          else:
1343
            raise errors.ParameterError(field)
1344
          node_output.append(str(val))
1345

    
1346
        output.append(node_output)
1347

    
1348
    return output
1349

    
1350

    
1351
class LUAddNode(LogicalUnit):
1352
  """Logical unit for adding node to the cluster.
1353

1354
  """
1355
  HPATH = "node-add"
1356
  HTYPE = constants.HTYPE_NODE
1357
  _OP_REQP = ["node_name"]
1358

    
1359
  def BuildHooksEnv(self):
1360
    """Build hooks env.
1361

1362
    This will run on all nodes before, and on all nodes + the new node after.
1363

1364
    """
1365
    env = {
1366
      "OP_TARGET": self.op.node_name,
1367
      "NODE_NAME": self.op.node_name,
1368
      "NODE_PIP": self.op.primary_ip,
1369
      "NODE_SIP": self.op.secondary_ip,
1370
      }
1371
    nodes_0 = self.cfg.GetNodeList()
1372
    nodes_1 = nodes_0 + [self.op.node_name, ]
1373
    return env, nodes_0, nodes_1
1374

    
1375
  def CheckPrereq(self):
1376
    """Check prerequisites.
1377

1378
    This checks:
1379
     - the new node is not already in the config
1380
     - it is resolvable
1381
     - its parameters (single/dual homed) matches the cluster
1382

1383
    Any errors are signalled by raising errors.OpPrereqError.
1384

1385
    """
1386
    node_name = self.op.node_name
1387
    cfg = self.cfg
1388

    
1389
    dns_data = utils.HostInfo(node_name)
1390

    
1391
    node = dns_data.name
1392
    primary_ip = self.op.primary_ip = dns_data.ip
1393
    secondary_ip = getattr(self.op, "secondary_ip", None)
1394
    if secondary_ip is None:
1395
      secondary_ip = primary_ip
1396
    if not utils.IsValidIP(secondary_ip):
1397
      raise errors.OpPrereqError("Invalid secondary IP given")
1398
    self.op.secondary_ip = secondary_ip
1399
    node_list = cfg.GetNodeList()
1400
    if node in node_list:
1401
      raise errors.OpPrereqError("Node %s is already in the configuration"
1402
                                 % node)
1403

    
1404
    for existing_node_name in node_list:
1405
      existing_node = cfg.GetNodeInfo(existing_node_name)
1406
      if (existing_node.primary_ip == primary_ip or
1407
          existing_node.secondary_ip == primary_ip or
1408
          existing_node.primary_ip == secondary_ip or
1409
          existing_node.secondary_ip == secondary_ip):
1410
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1411
                                   " existing node %s" % existing_node.name)
1412

    
1413
    # check that the type of the node (single versus dual homed) is the
1414
    # same as for the master
1415
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1416
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1417
    newbie_singlehomed = secondary_ip == primary_ip
1418
    if master_singlehomed != newbie_singlehomed:
1419
      if master_singlehomed:
1420
        raise errors.OpPrereqError("The master has no private ip but the"
1421
                                   " new node has one")
1422
      else:
1423
        raise errors.OpPrereqError("The master has a private ip but the"
1424
                                   " new node doesn't have one")
1425

    
1426
    # checks reachablity
1427
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1428
      raise errors.OpPrereqError("Node not reachable by ping")
1429

    
1430
    if not newbie_singlehomed:
1431
      # check reachability from my secondary ip to newbie's secondary ip
1432
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1433
                           source=myself.secondary_ip):
1434
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1435
                                   " based ping to noded port")
1436

    
1437
    self.new_node = objects.Node(name=node,
1438
                                 primary_ip=primary_ip,
1439
                                 secondary_ip=secondary_ip)
1440

    
1441
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1442
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1443
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1444
                                   constants.VNC_PASSWORD_FILE)
1445

    
1446
  def Exec(self, feedback_fn):
1447
    """Adds the new node to the cluster.
1448

1449
    """
1450
    new_node = self.new_node
1451
    node = new_node.name
1452

    
1453
    # set up inter-node password and certificate and restarts the node daemon
1454
    gntpass = self.sstore.GetNodeDaemonPassword()
1455
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1456
      raise errors.OpExecError("ganeti password corruption detected")
1457
    f = open(constants.SSL_CERT_FILE)
1458
    try:
1459
      gntpem = f.read(8192)
1460
    finally:
1461
      f.close()
1462
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1463
    # so we use this to detect an invalid certificate; as long as the
1464
    # cert doesn't contain this, the here-document will be correctly
1465
    # parsed by the shell sequence below
1466
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1467
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1468
    if not gntpem.endswith("\n"):
1469
      raise errors.OpExecError("PEM must end with newline")
1470
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1471

    
1472
    # and then connect with ssh to set password and start ganeti-noded
1473
    # note that all the below variables are sanitized at this point,
1474
    # either by being constants or by the checks above
1475
    ss = self.sstore
1476
    mycommand = ("umask 077 && "
1477
                 "echo '%s' > '%s' && "
1478
                 "cat > '%s' << '!EOF.' && \n"
1479
                 "%s!EOF.\n%s restart" %
1480
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1481
                  constants.SSL_CERT_FILE, gntpem,
1482
                  constants.NODE_INITD_SCRIPT))
1483

    
1484
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1485
    if result.failed:
1486
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1487
                               " output: %s" %
1488
                               (node, result.fail_reason, result.output))
1489

    
1490
    # check connectivity
1491
    time.sleep(4)
1492

    
1493
    result = rpc.call_version([node])[node]
1494
    if result:
1495
      if constants.PROTOCOL_VERSION == result:
1496
        logger.Info("communication to node %s fine, sw version %s match" %
1497
                    (node, result))
1498
      else:
1499
        raise errors.OpExecError("Version mismatch master version %s,"
1500
                                 " node version %s" %
1501
                                 (constants.PROTOCOL_VERSION, result))
1502
    else:
1503
      raise errors.OpExecError("Cannot get version from the new node")
1504

    
1505
    # setup ssh on node
1506
    logger.Info("copy ssh key to node %s" % node)
1507
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1508
    keyarray = []
1509
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1510
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1511
                priv_key, pub_key]
1512

    
1513
    for i in keyfiles:
1514
      f = open(i, 'r')
1515
      try:
1516
        keyarray.append(f.read())
1517
      finally:
1518
        f.close()
1519

    
1520
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1521
                               keyarray[3], keyarray[4], keyarray[5])
1522

    
1523
    if not result:
1524
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1525

    
1526
    # Add node to our /etc/hosts, and add key to known_hosts
1527
    _AddHostToEtcHosts(new_node.name)
1528

    
1529
    if new_node.secondary_ip != new_node.primary_ip:
1530
      if not rpc.call_node_tcp_ping(new_node.name,
1531
                                    constants.LOCALHOST_IP_ADDRESS,
1532
                                    new_node.secondary_ip,
1533
                                    constants.DEFAULT_NODED_PORT,
1534
                                    10, False):
1535
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1536
                                 " you gave (%s). Please fix and re-run this"
1537
                                 " command." % new_node.secondary_ip)
1538

    
1539
    success, msg = self.ssh.VerifyNodeHostname(node)
1540
    if not success:
1541
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1542
                               " than the one the resolver gives: %s."
1543
                               " Please fix and re-run this command." %
1544
                               (node, msg))
1545

    
1546
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1547
    # including the node just added
1548
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1549
    dist_nodes = self.cfg.GetNodeList() + [node]
1550
    if myself.name in dist_nodes:
1551
      dist_nodes.remove(myself.name)
1552

    
1553
    logger.Debug("Copying hosts and known_hosts to all nodes")
1554
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1555
      result = rpc.call_upload_file(dist_nodes, fname)
1556
      for to_node in dist_nodes:
1557
        if not result[to_node]:
1558
          logger.Error("copy of file %s to node %s failed" %
1559
                       (fname, to_node))
1560

    
1561
    to_copy = ss.GetFileList()
1562
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1563
      to_copy.append(constants.VNC_PASSWORD_FILE)
1564
    for fname in to_copy:
1565
      if not self.ssh.CopyFileToNode(node, fname):
1566
        logger.Error("could not copy file %s to node %s" % (fname, node))
1567

    
1568
    logger.Info("adding node %s to cluster.conf" % node)
1569
    self.cfg.AddNode(new_node)
1570

    
1571

    
1572
class LUMasterFailover(LogicalUnit):
1573
  """Failover the master node to the current node.
1574

1575
  This is a special LU in that it must run on a non-master node.
1576

1577
  """
1578
  HPATH = "master-failover"
1579
  HTYPE = constants.HTYPE_CLUSTER
1580
  REQ_MASTER = False
1581
  _OP_REQP = []
1582

    
1583
  def BuildHooksEnv(self):
1584
    """Build hooks env.
1585

1586
    This will run on the new master only in the pre phase, and on all
1587
    the nodes in the post phase.
1588

1589
    """
1590
    env = {
1591
      "OP_TARGET": self.new_master,
1592
      "NEW_MASTER": self.new_master,
1593
      "OLD_MASTER": self.old_master,
1594
      }
1595
    return env, [self.new_master], self.cfg.GetNodeList()
1596

    
1597
  def CheckPrereq(self):
1598
    """Check prerequisites.
1599

1600
    This checks that we are not already the master.
1601

1602
    """
1603
    self.new_master = utils.HostInfo().name
1604
    self.old_master = self.sstore.GetMasterNode()
1605

    
1606
    if self.old_master == self.new_master:
1607
      raise errors.OpPrereqError("This commands must be run on the node"
1608
                                 " where you want the new master to be."
1609
                                 " %s is already the master" %
1610
                                 self.old_master)
1611

    
1612
  def Exec(self, feedback_fn):
1613
    """Failover the master node.
1614

1615
    This command, when run on a non-master node, will cause the current
1616
    master to cease being master, and the non-master to become new
1617
    master.
1618

1619
    """
1620
    #TODO: do not rely on gethostname returning the FQDN
1621
    logger.Info("setting master to %s, old master: %s" %
1622
                (self.new_master, self.old_master))
1623

    
1624
    if not rpc.call_node_stop_master(self.old_master):
1625
      logger.Error("could disable the master role on the old master"
1626
                   " %s, please disable manually" % self.old_master)
1627

    
1628
    ss = self.sstore
1629
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1630
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1631
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1632
      logger.Error("could not distribute the new simple store master file"
1633
                   " to the other nodes, please check.")
1634

    
1635
    if not rpc.call_node_start_master(self.new_master):
1636
      logger.Error("could not start the master role on the new master"
1637
                   " %s, please check" % self.new_master)
1638
      feedback_fn("Error in activating the master IP on the new master,"
1639
                  " please fix manually.")
1640

    
1641

    
1642

    
1643
class LUQueryClusterInfo(NoHooksLU):
1644
  """Query cluster configuration.
1645

1646
  """
1647
  _OP_REQP = []
1648
  REQ_MASTER = False
1649

    
1650
  def CheckPrereq(self):
1651
    """No prerequsites needed for this LU.
1652

1653
    """
1654
    pass
1655

    
1656
  def Exec(self, feedback_fn):
1657
    """Return cluster config.
1658

1659
    """
1660
    result = {
1661
      "name": self.sstore.GetClusterName(),
1662
      "software_version": constants.RELEASE_VERSION,
1663
      "protocol_version": constants.PROTOCOL_VERSION,
1664
      "config_version": constants.CONFIG_VERSION,
1665
      "os_api_version": constants.OS_API_VERSION,
1666
      "export_version": constants.EXPORT_VERSION,
1667
      "master": self.sstore.GetMasterNode(),
1668
      "architecture": (platform.architecture()[0], platform.machine()),
1669
      }
1670

    
1671
    return result
1672

    
1673

    
1674
class LUClusterCopyFile(NoHooksLU):
1675
  """Copy file to cluster.
1676

1677
  """
1678
  _OP_REQP = ["nodes", "filename"]
1679

    
1680
  def CheckPrereq(self):
1681
    """Check prerequisites.
1682

1683
    It should check that the named file exists and that the given list
1684
    of nodes is valid.
1685

1686
    """
1687
    if not os.path.exists(self.op.filename):
1688
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1689

    
1690
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1691

    
1692
  def Exec(self, feedback_fn):
1693
    """Copy a file from master to some nodes.
1694

1695
    Args:
1696
      opts - class with options as members
1697
      args - list containing a single element, the file name
1698
    Opts used:
1699
      nodes - list containing the name of target nodes; if empty, all nodes
1700

1701
    """
1702
    filename = self.op.filename
1703

    
1704
    myname = utils.HostInfo().name
1705

    
1706
    for node in self.nodes:
1707
      if node == myname:
1708
        continue
1709
      if not self.ssh.CopyFileToNode(node, filename):
1710
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1711

    
1712

    
1713
class LUDumpClusterConfig(NoHooksLU):
1714
  """Return a text-representation of the cluster-config.
1715

1716
  """
1717
  _OP_REQP = []
1718

    
1719
  def CheckPrereq(self):
1720
    """No prerequisites.
1721

1722
    """
1723
    pass
1724

    
1725
  def Exec(self, feedback_fn):
1726
    """Dump a representation of the cluster config to the standard output.
1727

1728
    """
1729
    return self.cfg.DumpConfig()
1730

    
1731

    
1732
class LURunClusterCommand(NoHooksLU):
1733
  """Run a command on some nodes.
1734

1735
  """
1736
  _OP_REQP = ["command", "nodes"]
1737

    
1738
  def CheckPrereq(self):
1739
    """Check prerequisites.
1740

1741
    It checks that the given list of nodes is valid.
1742

1743
    """
1744
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1745

    
1746
  def Exec(self, feedback_fn):
1747
    """Run a command on some nodes.
1748

1749
    """
1750
    data = []
1751
    for node in self.nodes:
1752
      result = self.ssh.Run(node, "root", self.op.command)
1753
      data.append((node, result.output, result.exit_code))
1754

    
1755
    return data
1756

    
1757

    
1758
class LUActivateInstanceDisks(NoHooksLU):
1759
  """Bring up an instance's disks.
1760

1761
  """
1762
  _OP_REQP = ["instance_name"]
1763

    
1764
  def CheckPrereq(self):
1765
    """Check prerequisites.
1766

1767
    This checks that the instance is in the cluster.
1768

1769
    """
1770
    instance = self.cfg.GetInstanceInfo(
1771
      self.cfg.ExpandInstanceName(self.op.instance_name))
1772
    if instance is None:
1773
      raise errors.OpPrereqError("Instance '%s' not known" %
1774
                                 self.op.instance_name)
1775
    self.instance = instance
1776

    
1777

    
1778
  def Exec(self, feedback_fn):
1779
    """Activate the disks.
1780

1781
    """
1782
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1783
    if not disks_ok:
1784
      raise errors.OpExecError("Cannot activate block devices")
1785

    
1786
    return disks_info
1787

    
1788

    
1789
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1790
  """Prepare the block devices for an instance.
1791

1792
  This sets up the block devices on all nodes.
1793

1794
  Args:
1795
    instance: a ganeti.objects.Instance object
1796
    ignore_secondaries: if true, errors on secondary nodes won't result
1797
                        in an error return from the function
1798

1799
  Returns:
1800
    false if the operation failed
1801
    list of (host, instance_visible_name, node_visible_name) if the operation
1802
         suceeded with the mapping from node devices to instance devices
1803
  """
1804
  device_info = []
1805
  disks_ok = True
1806
  iname = instance.name
1807
  # With the two passes mechanism we try to reduce the window of
1808
  # opportunity for the race condition of switching DRBD to primary
1809
  # before handshaking occured, but we do not eliminate it
1810

    
1811
  # The proper fix would be to wait (with some limits) until the
1812
  # connection has been made and drbd transitions from WFConnection
1813
  # into any other network-connected state (Connected, SyncTarget,
1814
  # SyncSource, etc.)
1815

    
1816
  # 1st pass, assemble on all nodes in secondary mode
1817
  for inst_disk in instance.disks:
1818
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1819
      cfg.SetDiskID(node_disk, node)
1820
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1821
      if not result:
1822
        logger.Error("could not prepare block device %s on node %s"
1823
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1824
        if not ignore_secondaries:
1825
          disks_ok = False
1826

    
1827
  # FIXME: race condition on drbd migration to primary
1828

    
1829
  # 2nd pass, do only the primary node
1830
  for inst_disk in instance.disks:
1831
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1832
      if node != instance.primary_node:
1833
        continue
1834
      cfg.SetDiskID(node_disk, node)
1835
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1836
      if not result:
1837
        logger.Error("could not prepare block device %s on node %s"
1838
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1839
        disks_ok = False
1840
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1841

    
1842
  # leave the disks configured for the primary node
1843
  # this is a workaround that would be fixed better by
1844
  # improving the logical/physical id handling
1845
  for disk in instance.disks:
1846
    cfg.SetDiskID(disk, instance.primary_node)
1847

    
1848
  return disks_ok, device_info
1849

    
1850

    
1851
def _StartInstanceDisks(cfg, instance, force):
1852
  """Start the disks of an instance.
1853

1854
  """
1855
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1856
                                           ignore_secondaries=force)
1857
  if not disks_ok:
1858
    _ShutdownInstanceDisks(instance, cfg)
1859
    if force is not None and not force:
1860
      logger.Error("If the message above refers to a secondary node,"
1861
                   " you can retry the operation using '--force'.")
1862
    raise errors.OpExecError("Disk consistency error")
1863

    
1864

    
1865
class LUDeactivateInstanceDisks(NoHooksLU):
1866
  """Shutdown an instance's disks.
1867

1868
  """
1869
  _OP_REQP = ["instance_name"]
1870

    
1871
  def CheckPrereq(self):
1872
    """Check prerequisites.
1873

1874
    This checks that the instance is in the cluster.
1875

1876
    """
1877
    instance = self.cfg.GetInstanceInfo(
1878
      self.cfg.ExpandInstanceName(self.op.instance_name))
1879
    if instance is None:
1880
      raise errors.OpPrereqError("Instance '%s' not known" %
1881
                                 self.op.instance_name)
1882
    self.instance = instance
1883

    
1884
  def Exec(self, feedback_fn):
1885
    """Deactivate the disks
1886

1887
    """
1888
    instance = self.instance
1889
    ins_l = rpc.call_instance_list([instance.primary_node])
1890
    ins_l = ins_l[instance.primary_node]
1891
    if not type(ins_l) is list:
1892
      raise errors.OpExecError("Can't contact node '%s'" %
1893
                               instance.primary_node)
1894

    
1895
    if self.instance.name in ins_l:
1896
      raise errors.OpExecError("Instance is running, can't shutdown"
1897
                               " block devices.")
1898

    
1899
    _ShutdownInstanceDisks(instance, self.cfg)
1900

    
1901

    
1902
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1903
  """Shutdown block devices of an instance.
1904

1905
  This does the shutdown on all nodes of the instance.
1906

1907
  If the ignore_primary is false, errors on the primary node are
1908
  ignored.
1909

1910
  """
1911
  result = True
1912
  for disk in instance.disks:
1913
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1914
      cfg.SetDiskID(top_disk, node)
1915
      if not rpc.call_blockdev_shutdown(node, top_disk):
1916
        logger.Error("could not shutdown block device %s on node %s" %
1917
                     (disk.iv_name, node))
1918
        if not ignore_primary or node != instance.primary_node:
1919
          result = False
1920
  return result
1921

    
1922

    
1923
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1924
  """Checks if a node has enough free memory.
1925

1926
  This function check if a given node has the needed amount of free
1927
  memory. In case the node has less memory or we cannot get the
1928
  information from the node, this function raise an OpPrereqError
1929
  exception.
1930

1931
  Args:
1932
    - cfg: a ConfigWriter instance
1933
    - node: the node name
1934
    - reason: string to use in the error message
1935
    - requested: the amount of memory in MiB
1936

1937
  """
1938
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1939
  if not nodeinfo or not isinstance(nodeinfo, dict):
1940
    raise errors.OpPrereqError("Could not contact node %s for resource"
1941
                             " information" % (node,))
1942

    
1943
  free_mem = nodeinfo[node].get('memory_free')
1944
  if not isinstance(free_mem, int):
1945
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1946
                             " was '%s'" % (node, free_mem))
1947
  if requested > free_mem:
1948
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1949
                             " needed %s MiB, available %s MiB" %
1950
                             (node, reason, requested, free_mem))
1951

    
1952

    
1953
class LUStartupInstance(LogicalUnit):
1954
  """Starts an instance.
1955

1956
  """
1957
  HPATH = "instance-start"
1958
  HTYPE = constants.HTYPE_INSTANCE
1959
  _OP_REQP = ["instance_name", "force"]
1960

    
1961
  def BuildHooksEnv(self):
1962
    """Build hooks env.
1963

1964
    This runs on master, primary and secondary nodes of the instance.
1965

1966
    """
1967
    env = {
1968
      "FORCE": self.op.force,
1969
      }
1970
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1971
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1972
          list(self.instance.secondary_nodes))
1973
    return env, nl, nl
1974

    
1975
  def CheckPrereq(self):
1976
    """Check prerequisites.
1977

1978
    This checks that the instance is in the cluster.
1979

1980
    """
1981
    instance = self.cfg.GetInstanceInfo(
1982
      self.cfg.ExpandInstanceName(self.op.instance_name))
1983
    if instance is None:
1984
      raise errors.OpPrereqError("Instance '%s' not known" %
1985
                                 self.op.instance_name)
1986

    
1987
    # check bridges existance
1988
    _CheckInstanceBridgesExist(instance)
1989

    
1990
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1991
                         "starting instance %s" % instance.name,
1992
                         instance.memory)
1993

    
1994
    self.instance = instance
1995
    self.op.instance_name = instance.name
1996

    
1997
  def Exec(self, feedback_fn):
1998
    """Start the instance.
1999

2000
    """
2001
    instance = self.instance
2002
    force = self.op.force
2003
    extra_args = getattr(self.op, "extra_args", "")
2004

    
2005
    node_current = instance.primary_node
2006

    
2007
    _StartInstanceDisks(self.cfg, instance, force)
2008

    
2009
    if not rpc.call_instance_start(node_current, instance, extra_args):
2010
      _ShutdownInstanceDisks(instance, self.cfg)
2011
      raise errors.OpExecError("Could not start instance")
2012

    
2013
    self.cfg.MarkInstanceUp(instance.name)
2014

    
2015

    
2016
class LURebootInstance(LogicalUnit):
2017
  """Reboot an instance.
2018

2019
  """
2020
  HPATH = "instance-reboot"
2021
  HTYPE = constants.HTYPE_INSTANCE
2022
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2023

    
2024
  def BuildHooksEnv(self):
2025
    """Build hooks env.
2026

2027
    This runs on master, primary and secondary nodes of the instance.
2028

2029
    """
2030
    env = {
2031
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2032
      }
2033
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2034
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2035
          list(self.instance.secondary_nodes))
2036
    return env, nl, nl
2037

    
2038
  def CheckPrereq(self):
2039
    """Check prerequisites.
2040

2041
    This checks that the instance is in the cluster.
2042

2043
    """
2044
    instance = self.cfg.GetInstanceInfo(
2045
      self.cfg.ExpandInstanceName(self.op.instance_name))
2046
    if instance is None:
2047
      raise errors.OpPrereqError("Instance '%s' not known" %
2048
                                 self.op.instance_name)
2049

    
2050
    # check bridges existance
2051
    _CheckInstanceBridgesExist(instance)
2052

    
2053
    self.instance = instance
2054
    self.op.instance_name = instance.name
2055

    
2056
  def Exec(self, feedback_fn):
2057
    """Reboot the instance.
2058

2059
    """
2060
    instance = self.instance
2061
    ignore_secondaries = self.op.ignore_secondaries
2062
    reboot_type = self.op.reboot_type
2063
    extra_args = getattr(self.op, "extra_args", "")
2064

    
2065
    node_current = instance.primary_node
2066

    
2067
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2068
                           constants.INSTANCE_REBOOT_HARD,
2069
                           constants.INSTANCE_REBOOT_FULL]:
2070
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2071
                                  (constants.INSTANCE_REBOOT_SOFT,
2072
                                   constants.INSTANCE_REBOOT_HARD,
2073
                                   constants.INSTANCE_REBOOT_FULL))
2074

    
2075
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2076
                       constants.INSTANCE_REBOOT_HARD]:
2077
      if not rpc.call_instance_reboot(node_current, instance,
2078
                                      reboot_type, extra_args):
2079
        raise errors.OpExecError("Could not reboot instance")
2080
    else:
2081
      if not rpc.call_instance_shutdown(node_current, instance):
2082
        raise errors.OpExecError("could not shutdown instance for full reboot")
2083
      _ShutdownInstanceDisks(instance, self.cfg)
2084
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2085
      if not rpc.call_instance_start(node_current, instance, extra_args):
2086
        _ShutdownInstanceDisks(instance, self.cfg)
2087
        raise errors.OpExecError("Could not start instance for full reboot")
2088

    
2089
    self.cfg.MarkInstanceUp(instance.name)
2090

    
2091

    
2092
class LUShutdownInstance(LogicalUnit):
2093
  """Shutdown an instance.
2094

2095
  """
2096
  HPATH = "instance-stop"
2097
  HTYPE = constants.HTYPE_INSTANCE
2098
  _OP_REQP = ["instance_name"]
2099

    
2100
  def BuildHooksEnv(self):
2101
    """Build hooks env.
2102

2103
    This runs on master, primary and secondary nodes of the instance.
2104

2105
    """
2106
    env = _BuildInstanceHookEnvByObject(self.instance)
2107
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2108
          list(self.instance.secondary_nodes))
2109
    return env, nl, nl
2110

    
2111
  def CheckPrereq(self):
2112
    """Check prerequisites.
2113

2114
    This checks that the instance is in the cluster.
2115

2116
    """
2117
    instance = self.cfg.GetInstanceInfo(
2118
      self.cfg.ExpandInstanceName(self.op.instance_name))
2119
    if instance is None:
2120
      raise errors.OpPrereqError("Instance '%s' not known" %
2121
                                 self.op.instance_name)
2122
    self.instance = instance
2123

    
2124
  def Exec(self, feedback_fn):
2125
    """Shutdown the instance.
2126

2127
    """
2128
    instance = self.instance
2129
    node_current = instance.primary_node
2130
    if not rpc.call_instance_shutdown(node_current, instance):
2131
      logger.Error("could not shutdown instance")
2132

    
2133
    self.cfg.MarkInstanceDown(instance.name)
2134
    _ShutdownInstanceDisks(instance, self.cfg)
2135

    
2136

    
2137
class LUReinstallInstance(LogicalUnit):
2138
  """Reinstall an instance.
2139

2140
  """
2141
  HPATH = "instance-reinstall"
2142
  HTYPE = constants.HTYPE_INSTANCE
2143
  _OP_REQP = ["instance_name"]
2144

    
2145
  def BuildHooksEnv(self):
2146
    """Build hooks env.
2147

2148
    This runs on master, primary and secondary nodes of the instance.
2149

2150
    """
2151
    env = _BuildInstanceHookEnvByObject(self.instance)
2152
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2153
          list(self.instance.secondary_nodes))
2154
    return env, nl, nl
2155

    
2156
  def CheckPrereq(self):
2157
    """Check prerequisites.
2158

2159
    This checks that the instance is in the cluster and is not running.
2160

2161
    """
2162
    instance = self.cfg.GetInstanceInfo(
2163
      self.cfg.ExpandInstanceName(self.op.instance_name))
2164
    if instance is None:
2165
      raise errors.OpPrereqError("Instance '%s' not known" %
2166
                                 self.op.instance_name)
2167
    if instance.disk_template == constants.DT_DISKLESS:
2168
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2169
                                 self.op.instance_name)
2170
    if instance.status != "down":
2171
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2172
                                 self.op.instance_name)
2173
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2174
    if remote_info:
2175
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2176
                                 (self.op.instance_name,
2177
                                  instance.primary_node))
2178

    
2179
    self.op.os_type = getattr(self.op, "os_type", None)
2180
    if self.op.os_type is not None:
2181
      # OS verification
2182
      pnode = self.cfg.GetNodeInfo(
2183
        self.cfg.ExpandNodeName(instance.primary_node))
2184
      if pnode is None:
2185
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2186
                                   self.op.pnode)
2187
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2188
      if not os_obj:
2189
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2190
                                   " primary node"  % self.op.os_type)
2191

    
2192
    self.instance = instance
2193

    
2194
  def Exec(self, feedback_fn):
2195
    """Reinstall the instance.
2196

2197
    """
2198
    inst = self.instance
2199

    
2200
    if self.op.os_type is not None:
2201
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2202
      inst.os = self.op.os_type
2203
      self.cfg.AddInstance(inst)
2204

    
2205
    _StartInstanceDisks(self.cfg, inst, None)
2206
    try:
2207
      feedback_fn("Running the instance OS create scripts...")
2208
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2209
        raise errors.OpExecError("Could not install OS for instance %s"
2210
                                 " on node %s" %
2211
                                 (inst.name, inst.primary_node))
2212
    finally:
2213
      _ShutdownInstanceDisks(inst, self.cfg)
2214

    
2215

    
2216
class LURenameInstance(LogicalUnit):
2217
  """Rename an instance.
2218

2219
  """
2220
  HPATH = "instance-rename"
2221
  HTYPE = constants.HTYPE_INSTANCE
2222
  _OP_REQP = ["instance_name", "new_name"]
2223

    
2224
  def BuildHooksEnv(self):
2225
    """Build hooks env.
2226

2227
    This runs on master, primary and secondary nodes of the instance.
2228

2229
    """
2230
    env = _BuildInstanceHookEnvByObject(self.instance)
2231
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2232
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2233
          list(self.instance.secondary_nodes))
2234
    return env, nl, nl
2235

    
2236
  def CheckPrereq(self):
2237
    """Check prerequisites.
2238

2239
    This checks that the instance is in the cluster and is not running.
2240

2241
    """
2242
    instance = self.cfg.GetInstanceInfo(
2243
      self.cfg.ExpandInstanceName(self.op.instance_name))
2244
    if instance is None:
2245
      raise errors.OpPrereqError("Instance '%s' not known" %
2246
                                 self.op.instance_name)
2247
    if instance.status != "down":
2248
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2249
                                 self.op.instance_name)
2250
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2251
    if remote_info:
2252
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2253
                                 (self.op.instance_name,
2254
                                  instance.primary_node))
2255
    self.instance = instance
2256

    
2257
    # new name verification
2258
    name_info = utils.HostInfo(self.op.new_name)
2259

    
2260
    self.op.new_name = new_name = name_info.name
2261
    instance_list = self.cfg.GetInstanceList()
2262
    if new_name in instance_list:
2263
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2264
                                 instance_name)
2265

    
2266
    if not getattr(self.op, "ignore_ip", False):
2267
      command = ["fping", "-q", name_info.ip]
2268
      result = utils.RunCmd(command)
2269
      if not result.failed:
2270
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2271
                                   (name_info.ip, new_name))
2272

    
2273

    
2274
  def Exec(self, feedback_fn):
2275
    """Reinstall the instance.
2276

2277
    """
2278
    inst = self.instance
2279
    old_name = inst.name
2280

    
2281
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2282

    
2283
    # re-read the instance from the configuration after rename
2284
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2285

    
2286
    _StartInstanceDisks(self.cfg, inst, None)
2287
    try:
2288
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2289
                                          "sda", "sdb"):
2290
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2291
               " instance has been renamed in Ganeti)" %
2292
               (inst.name, inst.primary_node))
2293
        logger.Error(msg)
2294
    finally:
2295
      _ShutdownInstanceDisks(inst, self.cfg)
2296

    
2297

    
2298
class LURemoveInstance(LogicalUnit):
2299
  """Remove an instance.
2300

2301
  """
2302
  HPATH = "instance-remove"
2303
  HTYPE = constants.HTYPE_INSTANCE
2304
  _OP_REQP = ["instance_name"]
2305

    
2306
  def BuildHooksEnv(self):
2307
    """Build hooks env.
2308

2309
    This runs on master, primary and secondary nodes of the instance.
2310

2311
    """
2312
    env = _BuildInstanceHookEnvByObject(self.instance)
2313
    nl = [self.sstore.GetMasterNode()]
2314
    return env, nl, nl
2315

    
2316
  def CheckPrereq(self):
2317
    """Check prerequisites.
2318

2319
    This checks that the instance is in the cluster.
2320

2321
    """
2322
    instance = self.cfg.GetInstanceInfo(
2323
      self.cfg.ExpandInstanceName(self.op.instance_name))
2324
    if instance is None:
2325
      raise errors.OpPrereqError("Instance '%s' not known" %
2326
                                 self.op.instance_name)
2327
    self.instance = instance
2328

    
2329
  def Exec(self, feedback_fn):
2330
    """Remove the instance.
2331

2332
    """
2333
    instance = self.instance
2334
    logger.Info("shutting down instance %s on node %s" %
2335
                (instance.name, instance.primary_node))
2336

    
2337
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2338
      if self.op.ignore_failures:
2339
        feedback_fn("Warning: can't shutdown instance")
2340
      else:
2341
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2342
                                 (instance.name, instance.primary_node))
2343

    
2344
    logger.Info("removing block devices for instance %s" % instance.name)
2345

    
2346
    if not _RemoveDisks(instance, self.cfg):
2347
      if self.op.ignore_failures:
2348
        feedback_fn("Warning: can't remove instance's disks")
2349
      else:
2350
        raise errors.OpExecError("Can't remove instance's disks")
2351

    
2352
    logger.Info("removing instance %s out of cluster config" % instance.name)
2353

    
2354
    self.cfg.RemoveInstance(instance.name)
2355

    
2356

    
2357
class LUQueryInstances(NoHooksLU):
2358
  """Logical unit for querying instances.
2359

2360
  """
2361
  _OP_REQP = ["output_fields", "names"]
2362

    
2363
  def CheckPrereq(self):
2364
    """Check prerequisites.
2365

2366
    This checks that the fields required are valid output fields.
2367

2368
    """
2369
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2370
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2371
                               "admin_state", "admin_ram",
2372
                               "disk_template", "ip", "mac", "bridge",
2373
                               "sda_size", "sdb_size", "vcpus"],
2374
                       dynamic=self.dynamic_fields,
2375
                       selected=self.op.output_fields)
2376

    
2377
    self.wanted = _GetWantedInstances(self, self.op.names)
2378

    
2379
  def Exec(self, feedback_fn):
2380
    """Computes the list of nodes and their attributes.
2381

2382
    """
2383
    instance_names = self.wanted
2384
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2385
                     in instance_names]
2386

    
2387
    # begin data gathering
2388

    
2389
    nodes = frozenset([inst.primary_node for inst in instance_list])
2390

    
2391
    bad_nodes = []
2392
    if self.dynamic_fields.intersection(self.op.output_fields):
2393
      live_data = {}
2394
      node_data = rpc.call_all_instances_info(nodes)
2395
      for name in nodes:
2396
        result = node_data[name]
2397
        if result:
2398
          live_data.update(result)
2399
        elif result == False:
2400
          bad_nodes.append(name)
2401
        # else no instance is alive
2402
    else:
2403
      live_data = dict([(name, {}) for name in instance_names])
2404

    
2405
    # end data gathering
2406

    
2407
    output = []
2408
    for instance in instance_list:
2409
      iout = []
2410
      for field in self.op.output_fields:
2411
        if field == "name":
2412
          val = instance.name
2413
        elif field == "os":
2414
          val = instance.os
2415
        elif field == "pnode":
2416
          val = instance.primary_node
2417
        elif field == "snodes":
2418
          val = list(instance.secondary_nodes)
2419
        elif field == "admin_state":
2420
          val = (instance.status != "down")
2421
        elif field == "oper_state":
2422
          if instance.primary_node in bad_nodes:
2423
            val = None
2424
          else:
2425
            val = bool(live_data.get(instance.name))
2426
        elif field == "status":
2427
          if instance.primary_node in bad_nodes:
2428
            val = "ERROR_nodedown"
2429
          else:
2430
            running = bool(live_data.get(instance.name))
2431
            if running:
2432
              if instance.status != "down":
2433
                val = "running"
2434
              else:
2435
                val = "ERROR_up"
2436
            else:
2437
              if instance.status != "down":
2438
                val = "ERROR_down"
2439
              else:
2440
                val = "ADMIN_down"
2441
        elif field == "admin_ram":
2442
          val = instance.memory
2443
        elif field == "oper_ram":
2444
          if instance.primary_node in bad_nodes:
2445
            val = None
2446
          elif instance.name in live_data:
2447
            val = live_data[instance.name].get("memory", "?")
2448
          else:
2449
            val = "-"
2450
        elif field == "disk_template":
2451
          val = instance.disk_template
2452
        elif field == "ip":
2453
          val = instance.nics[0].ip
2454
        elif field == "bridge":
2455
          val = instance.nics[0].bridge
2456
        elif field == "mac":
2457
          val = instance.nics[0].mac
2458
        elif field == "sda_size" or field == "sdb_size":
2459
          disk = instance.FindDisk(field[:3])
2460
          if disk is None:
2461
            val = None
2462
          else:
2463
            val = disk.size
2464
        elif field == "vcpus":
2465
          val = instance.vcpus
2466
        else:
2467
          raise errors.ParameterError(field)
2468
        iout.append(val)
2469
      output.append(iout)
2470

    
2471
    return output
2472

    
2473

    
2474
class LUFailoverInstance(LogicalUnit):
2475
  """Failover an instance.
2476

2477
  """
2478
  HPATH = "instance-failover"
2479
  HTYPE = constants.HTYPE_INSTANCE
2480
  _OP_REQP = ["instance_name", "ignore_consistency"]
2481

    
2482
  def BuildHooksEnv(self):
2483
    """Build hooks env.
2484

2485
    This runs on master, primary and secondary nodes of the instance.
2486

2487
    """
2488
    env = {
2489
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2490
      }
2491
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2492
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2493
    return env, nl, nl
2494

    
2495
  def CheckPrereq(self):
2496
    """Check prerequisites.
2497

2498
    This checks that the instance is in the cluster.
2499

2500
    """
2501
    instance = self.cfg.GetInstanceInfo(
2502
      self.cfg.ExpandInstanceName(self.op.instance_name))
2503
    if instance is None:
2504
      raise errors.OpPrereqError("Instance '%s' not known" %
2505
                                 self.op.instance_name)
2506

    
2507
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2508
      raise errors.OpPrereqError("Instance's disk layout is not"
2509
                                 " network mirrored, cannot failover.")
2510

    
2511
    secondary_nodes = instance.secondary_nodes
2512
    if not secondary_nodes:
2513
      raise errors.ProgrammerError("no secondary node but using "
2514
                                   "DT_REMOTE_RAID1 template")
2515

    
2516
    target_node = secondary_nodes[0]
2517
    # check memory requirements on the secondary node
2518
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2519
                         instance.name, instance.memory)
2520

    
2521
    # check bridge existance
2522
    brlist = [nic.bridge for nic in instance.nics]
2523
    if not rpc.call_bridges_exist(target_node, brlist):
2524
      raise errors.OpPrereqError("One or more target bridges %s does not"
2525
                                 " exist on destination node '%s'" %
2526
                                 (brlist, target_node))
2527

    
2528
    self.instance = instance
2529

    
2530
  def Exec(self, feedback_fn):
2531
    """Failover an instance.
2532

2533
    The failover is done by shutting it down on its present node and
2534
    starting it on the secondary.
2535

2536
    """
2537
    instance = self.instance
2538

    
2539
    source_node = instance.primary_node
2540
    target_node = instance.secondary_nodes[0]
2541

    
2542
    feedback_fn("* checking disk consistency between source and target")
2543
    for dev in instance.disks:
2544
      # for remote_raid1, these are md over drbd
2545
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2546
        if not self.op.ignore_consistency:
2547
          raise errors.OpExecError("Disk %s is degraded on target node,"
2548
                                   " aborting failover." % dev.iv_name)
2549

    
2550
    feedback_fn("* shutting down instance on source node")
2551
    logger.Info("Shutting down instance %s on node %s" %
2552
                (instance.name, source_node))
2553

    
2554
    if not rpc.call_instance_shutdown(source_node, instance):
2555
      if self.op.ignore_consistency:
2556
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2557
                     " anyway. Please make sure node %s is down"  %
2558
                     (instance.name, source_node, source_node))
2559
      else:
2560
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2561
                                 (instance.name, source_node))
2562

    
2563
    feedback_fn("* deactivating the instance's disks on source node")
2564
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2565
      raise errors.OpExecError("Can't shut down the instance's disks.")
2566

    
2567
    instance.primary_node = target_node
2568
    # distribute new instance config to the other nodes
2569
    self.cfg.AddInstance(instance)
2570

    
2571
    feedback_fn("* activating the instance's disks on target node")
2572
    logger.Info("Starting instance %s on node %s" %
2573
                (instance.name, target_node))
2574

    
2575
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2576
                                             ignore_secondaries=True)
2577
    if not disks_ok:
2578
      _ShutdownInstanceDisks(instance, self.cfg)
2579
      raise errors.OpExecError("Can't activate the instance's disks")
2580

    
2581
    feedback_fn("* starting the instance on the target node")
2582
    if not rpc.call_instance_start(target_node, instance, None):
2583
      _ShutdownInstanceDisks(instance, self.cfg)
2584
      raise errors.OpExecError("Could not start instance %s on node %s." %
2585
                               (instance.name, target_node))
2586

    
2587

    
2588
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2589
  """Create a tree of block devices on the primary node.
2590

2591
  This always creates all devices.
2592

2593
  """
2594
  if device.children:
2595
    for child in device.children:
2596
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2597
        return False
2598

    
2599
  cfg.SetDiskID(device, node)
2600
  new_id = rpc.call_blockdev_create(node, device, device.size,
2601
                                    instance.name, True, info)
2602
  if not new_id:
2603
    return False
2604
  if device.physical_id is None:
2605
    device.physical_id = new_id
2606
  return True
2607

    
2608

    
2609
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2610
  """Create a tree of block devices on a secondary node.
2611

2612
  If this device type has to be created on secondaries, create it and
2613
  all its children.
2614

2615
  If not, just recurse to children keeping the same 'force' value.
2616

2617
  """
2618
  if device.CreateOnSecondary():
2619
    force = True
2620
  if device.children:
2621
    for child in device.children:
2622
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2623
                                        child, force, info):
2624
        return False
2625

    
2626
  if not force:
2627
    return True
2628
  cfg.SetDiskID(device, node)
2629
  new_id = rpc.call_blockdev_create(node, device, device.size,
2630
                                    instance.name, False, info)
2631
  if not new_id:
2632
    return False
2633
  if device.physical_id is None:
2634
    device.physical_id = new_id
2635
  return True
2636

    
2637

    
2638
def _GenerateUniqueNames(cfg, exts):
2639
  """Generate a suitable LV name.
2640

2641
  This will generate a logical volume name for the given instance.
2642

2643
  """
2644
  results = []
2645
  for val in exts:
2646
    new_id = cfg.GenerateUniqueID()
2647
    results.append("%s%s" % (new_id, val))
2648
  return results
2649

    
2650

    
2651
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2652
  """Generate a drbd device complete with its children.
2653

2654
  """
2655
  port = cfg.AllocatePort()
2656
  vgname = cfg.GetVGName()
2657
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2658
                          logical_id=(vgname, names[0]))
2659
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2660
                          logical_id=(vgname, names[1]))
2661
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2662
                          logical_id = (primary, secondary, port),
2663
                          children = [dev_data, dev_meta])
2664
  return drbd_dev
2665

    
2666

    
2667
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2668
  """Generate a drbd8 device complete with its children.
2669

2670
  """
2671
  port = cfg.AllocatePort()
2672
  vgname = cfg.GetVGName()
2673
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2674
                          logical_id=(vgname, names[0]))
2675
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2676
                          logical_id=(vgname, names[1]))
2677
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2678
                          logical_id = (primary, secondary, port),
2679
                          children = [dev_data, dev_meta],
2680
                          iv_name=iv_name)
2681
  return drbd_dev
2682

    
2683

    
2684
def _GenerateDiskTemplate(cfg, template_name,
2685
                          instance_name, primary_node,
2686
                          secondary_nodes, disk_sz, swap_sz):
2687
  """Generate the entire disk layout for a given template type.
2688

2689
  """
2690
  #TODO: compute space requirements
2691

    
2692
  vgname = cfg.GetVGName()
2693
  if template_name == constants.DT_DISKLESS:
2694
    disks = []
2695
  elif template_name == constants.DT_PLAIN:
2696
    if len(secondary_nodes) != 0:
2697
      raise errors.ProgrammerError("Wrong template configuration")
2698

    
2699
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2700
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2701
                           logical_id=(vgname, names[0]),
2702
                           iv_name = "sda")
2703
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2704
                           logical_id=(vgname, names[1]),
2705
                           iv_name = "sdb")
2706
    disks = [sda_dev, sdb_dev]
2707
  elif template_name == constants.DT_LOCAL_RAID1:
2708
    if len(secondary_nodes) != 0:
2709
      raise errors.ProgrammerError("Wrong template configuration")
2710

    
2711

    
2712
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2713
                                       ".sdb_m1", ".sdb_m2"])
2714
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2715
                              logical_id=(vgname, names[0]))
2716
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2717
                              logical_id=(vgname, names[1]))
2718
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2719
                              size=disk_sz,
2720
                              children = [sda_dev_m1, sda_dev_m2])
2721
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722
                              logical_id=(vgname, names[2]))
2723
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2724
                              logical_id=(vgname, names[3]))
2725
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2726
                              size=swap_sz,
2727
                              children = [sdb_dev_m1, sdb_dev_m2])
2728
    disks = [md_sda_dev, md_sdb_dev]
2729
  elif template_name == constants.DT_REMOTE_RAID1:
2730
    if len(secondary_nodes) != 1:
2731
      raise errors.ProgrammerError("Wrong template configuration")
2732
    remote_node = secondary_nodes[0]
2733
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2734
                                       ".sdb_data", ".sdb_meta"])
2735
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2736
                                         disk_sz, names[0:2])
2737
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2738
                              children = [drbd_sda_dev], size=disk_sz)
2739
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2740
                                         swap_sz, names[2:4])
2741
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2742
                              children = [drbd_sdb_dev], size=swap_sz)
2743
    disks = [md_sda_dev, md_sdb_dev]
2744
  elif template_name == constants.DT_DRBD8:
2745
    if len(secondary_nodes) != 1:
2746
      raise errors.ProgrammerError("Wrong template configuration")
2747
    remote_node = secondary_nodes[0]
2748
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2749
                                       ".sdb_data", ".sdb_meta"])
2750
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2751
                                         disk_sz, names[0:2], "sda")
2752
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2753
                                         swap_sz, names[2:4], "sdb")
2754
    disks = [drbd_sda_dev, drbd_sdb_dev]
2755
  else:
2756
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2757
  return disks
2758

    
2759

    
2760
def _GetInstanceInfoText(instance):
2761
  """Compute that text that should be added to the disk's metadata.
2762

2763
  """
2764
  return "originstname+%s" % instance.name
2765

    
2766

    
2767
def _CreateDisks(cfg, instance):
2768
  """Create all disks for an instance.
2769

2770
  This abstracts away some work from AddInstance.
2771

2772
  Args:
2773
    instance: the instance object
2774

2775
  Returns:
2776
    True or False showing the success of the creation process
2777

2778
  """
2779
  info = _GetInstanceInfoText(instance)
2780

    
2781
  for device in instance.disks:
2782
    logger.Info("creating volume %s for instance %s" %
2783
              (device.iv_name, instance.name))
2784
    #HARDCODE
2785
    for secondary_node in instance.secondary_nodes:
2786
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2787
                                        device, False, info):
2788
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2789
                     (device.iv_name, device, secondary_node))
2790
        return False
2791
    #HARDCODE
2792
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2793
                                    instance, device, info):
2794
      logger.Error("failed to create volume %s on primary!" %
2795
                   device.iv_name)
2796
      return False
2797
  return True
2798

    
2799

    
2800
def _RemoveDisks(instance, cfg):
2801
  """Remove all disks for an instance.
2802

2803
  This abstracts away some work from `AddInstance()` and
2804
  `RemoveInstance()`. Note that in case some of the devices couldn't
2805
  be removed, the removal will continue with the other ones (compare
2806
  with `_CreateDisks()`).
2807

2808
  Args:
2809
    instance: the instance object
2810

2811
  Returns:
2812
    True or False showing the success of the removal proces
2813

2814
  """
2815
  logger.Info("removing block devices for instance %s" % instance.name)
2816

    
2817
  result = True
2818
  for device in instance.disks:
2819
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2820
      cfg.SetDiskID(disk, node)
2821
      if not rpc.call_blockdev_remove(node, disk):
2822
        logger.Error("could not remove block device %s on node %s,"
2823
                     " continuing anyway" %
2824
                     (device.iv_name, node))
2825
        result = False
2826
  return result
2827

    
2828

    
2829
class LUCreateInstance(LogicalUnit):
2830
  """Create an instance.
2831

2832
  """
2833
  HPATH = "instance-add"
2834
  HTYPE = constants.HTYPE_INSTANCE
2835
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2836
              "disk_template", "swap_size", "mode", "start", "vcpus",
2837
              "wait_for_sync", "ip_check", "mac"]
2838

    
2839
  def BuildHooksEnv(self):
2840
    """Build hooks env.
2841

2842
    This runs on master, primary and secondary nodes of the instance.
2843

2844
    """
2845
    env = {
2846
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2847
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2848
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2849
      "INSTANCE_ADD_MODE": self.op.mode,
2850
      }
2851
    if self.op.mode == constants.INSTANCE_IMPORT:
2852
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2853
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2854
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2855

    
2856
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2857
      primary_node=self.op.pnode,
2858
      secondary_nodes=self.secondaries,
2859
      status=self.instance_status,
2860
      os_type=self.op.os_type,
2861
      memory=self.op.mem_size,
2862
      vcpus=self.op.vcpus,
2863
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2864
    ))
2865

    
2866
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2867
          self.secondaries)
2868
    return env, nl, nl
2869

    
2870

    
2871
  def CheckPrereq(self):
2872
    """Check prerequisites.
2873

2874
    """
2875
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2876
      if not hasattr(self.op, attr):
2877
        setattr(self.op, attr, None)
2878

    
2879
    if self.op.mode not in (constants.INSTANCE_CREATE,
2880
                            constants.INSTANCE_IMPORT):
2881
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2882
                                 self.op.mode)
2883

    
2884
    if self.op.mode == constants.INSTANCE_IMPORT:
2885
      src_node = getattr(self.op, "src_node", None)
2886
      src_path = getattr(self.op, "src_path", None)
2887
      if src_node is None or src_path is None:
2888
        raise errors.OpPrereqError("Importing an instance requires source"
2889
                                   " node and path options")
2890
      src_node_full = self.cfg.ExpandNodeName(src_node)
2891
      if src_node_full is None:
2892
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2893
      self.op.src_node = src_node = src_node_full
2894

    
2895
      if not os.path.isabs(src_path):
2896
        raise errors.OpPrereqError("The source path must be absolute")
2897

    
2898
      export_info = rpc.call_export_info(src_node, src_path)
2899

    
2900
      if not export_info:
2901
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2902

    
2903
      if not export_info.has_section(constants.INISECT_EXP):
2904
        raise errors.ProgrammerError("Corrupted export config")
2905

    
2906
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2907
      if (int(ei_version) != constants.EXPORT_VERSION):
2908
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2909
                                   (ei_version, constants.EXPORT_VERSION))
2910

    
2911
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2912
        raise errors.OpPrereqError("Can't import instance with more than"
2913
                                   " one data disk")
2914

    
2915
      # FIXME: are the old os-es, disk sizes, etc. useful?
2916
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2917
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2918
                                                         'disk0_dump'))
2919
      self.src_image = diskimage
2920
    else: # INSTANCE_CREATE
2921
      if getattr(self.op, "os_type", None) is None:
2922
        raise errors.OpPrereqError("No guest OS specified")
2923

    
2924
    # check primary node
2925
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2926
    if pnode is None:
2927
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2928
                                 self.op.pnode)
2929
    self.op.pnode = pnode.name
2930
    self.pnode = pnode
2931
    self.secondaries = []
2932
    # disk template and mirror node verification
2933
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2934
      raise errors.OpPrereqError("Invalid disk template name")
2935

    
2936
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2937
      if getattr(self.op, "snode", None) is None:
2938
        raise errors.OpPrereqError("The networked disk templates need"
2939
                                   " a mirror node")
2940

    
2941
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2942
      if snode_name is None:
2943
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2944
                                   self.op.snode)
2945
      elif snode_name == pnode.name:
2946
        raise errors.OpPrereqError("The secondary node cannot be"
2947
                                   " the primary node.")
2948
      self.secondaries.append(snode_name)
2949

    
2950
    # Required free disk space as a function of disk and swap space
2951
    req_size_dict = {
2952
      constants.DT_DISKLESS: None,
2953
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2954
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2955
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2956
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2957
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2958
    }
2959

    
2960
    if self.op.disk_template not in req_size_dict:
2961
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2962
                                   " is unknown" %  self.op.disk_template)
2963

    
2964
    req_size = req_size_dict[self.op.disk_template]
2965

    
2966
    # Check lv size requirements
2967
    if req_size is not None:
2968
      nodenames = [pnode.name] + self.secondaries
2969
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2970
      for node in nodenames:
2971
        info = nodeinfo.get(node, None)
2972
        if not info:
2973
          raise errors.OpPrereqError("Cannot get current information"
2974
                                     " from node '%s'" % nodeinfo)
2975
        vg_free = info.get('vg_free', None)
2976
        if not isinstance(vg_free, int):
2977
          raise errors.OpPrereqError("Can't compute free disk space on"
2978
                                     " node %s" % node)
2979
        if req_size > info['vg_free']:
2980
          raise errors.OpPrereqError("Not enough disk space on target node %s."
2981
                                     " %d MB available, %d MB required" %
2982
                                     (node, info['vg_free'], req_size))
2983

    
2984
    # os verification
2985
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2986
    if not os_obj:
2987
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2988
                                 " primary node"  % self.op.os_type)
2989

    
2990
    if self.op.kernel_path == constants.VALUE_NONE:
2991
      raise errors.OpPrereqError("Can't set instance kernel to none")
2992

    
2993
    # instance verification
2994
    hostname1 = utils.HostInfo(self.op.instance_name)
2995

    
2996
    self.op.instance_name = instance_name = hostname1.name
2997
    instance_list = self.cfg.GetInstanceList()
2998
    if instance_name in instance_list:
2999
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3000
                                 instance_name)
3001

    
3002
    ip = getattr(self.op, "ip", None)
3003
    if ip is None or ip.lower() == "none":
3004
      inst_ip = None
3005
    elif ip.lower() == "auto":
3006
      inst_ip = hostname1.ip
3007
    else:
3008
      if not utils.IsValidIP(ip):
3009
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3010
                                   " like a valid IP" % ip)
3011
      inst_ip = ip
3012
    self.inst_ip = inst_ip
3013

    
3014
    if self.op.start and not self.op.ip_check:
3015
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3016
                                 " adding an instance in start mode")
3017

    
3018
    if self.op.ip_check:
3019
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3020
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3021
                                   (hostname1.ip, instance_name))
3022

    
3023
    # MAC address verification
3024
    if self.op.mac != "auto":
3025
      if not utils.IsValidMac(self.op.mac.lower()):
3026
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3027
                                   self.op.mac)
3028

    
3029
    # bridge verification
3030
    bridge = getattr(self.op, "bridge", None)
3031
    if bridge is None:
3032
      self.op.bridge = self.cfg.GetDefBridge()
3033
    else:
3034
      self.op.bridge = bridge
3035

    
3036
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3037
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3038
                                 " destination node '%s'" %
3039
                                 (self.op.bridge, pnode.name))
3040

    
3041
    # boot order verification
3042
    if self.op.hvm_boot_order is not None:
3043
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3044
        raise errors.OpPrereqError("invalid boot order specified,"
3045
                                   " must be one or more of [acdn]")
3046

    
3047
    if self.op.start:
3048
      self.instance_status = 'up'
3049
    else:
3050
      self.instance_status = 'down'
3051

    
3052
  def Exec(self, feedback_fn):
3053
    """Create and add the instance to the cluster.
3054

3055
    """
3056
    instance = self.op.instance_name
3057
    pnode_name = self.pnode.name
3058

    
3059
    if self.op.mac == "auto":
3060
      mac_address = self.cfg.GenerateMAC()
3061
    else:
3062
      mac_address = self.op.mac
3063

    
3064
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3065
    if self.inst_ip is not None:
3066
      nic.ip = self.inst_ip
3067

    
3068
    ht_kind = self.sstore.GetHypervisorType()
3069
    if ht_kind in constants.HTS_REQ_PORT:
3070
      network_port = self.cfg.AllocatePort()
3071
    else:
3072
      network_port = None
3073

    
3074
    disks = _GenerateDiskTemplate(self.cfg,
3075
                                  self.op.disk_template,
3076
                                  instance, pnode_name,
3077
                                  self.secondaries, self.op.disk_size,
3078
                                  self.op.swap_size)
3079

    
3080
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3081
                            primary_node=pnode_name,
3082
                            memory=self.op.mem_size,
3083
                            vcpus=self.op.vcpus,
3084
                            nics=[nic], disks=disks,
3085
                            disk_template=self.op.disk_template,
3086
                            status=self.instance_status,
3087
                            network_port=network_port,
3088
                            kernel_path=self.op.kernel_path,
3089
                            initrd_path=self.op.initrd_path,
3090
                            hvm_boot_order=self.op.hvm_boot_order,
3091
                            )
3092

    
3093
    feedback_fn("* creating instance disks...")
3094
    if not _CreateDisks(self.cfg, iobj):
3095
      _RemoveDisks(iobj, self.cfg)
3096
      raise errors.OpExecError("Device creation failed, reverting...")
3097

    
3098
    feedback_fn("adding instance %s to cluster config" % instance)
3099

    
3100
    self.cfg.AddInstance(iobj)
3101

    
3102
    if self.op.wait_for_sync:
3103
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3104
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3105
      # make sure the disks are not degraded (still sync-ing is ok)
3106
      time.sleep(15)
3107
      feedback_fn("* checking mirrors status")
3108
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3109
    else:
3110
      disk_abort = False
3111

    
3112
    if disk_abort:
3113
      _RemoveDisks(iobj, self.cfg)
3114
      self.cfg.RemoveInstance(iobj.name)
3115
      raise errors.OpExecError("There are some degraded disks for"
3116
                               " this instance")
3117

    
3118
    feedback_fn("creating os for instance %s on node %s" %
3119
                (instance, pnode_name))
3120

    
3121
    if iobj.disk_template != constants.DT_DISKLESS:
3122
      if self.op.mode == constants.INSTANCE_CREATE:
3123
        feedback_fn("* running the instance OS create scripts...")
3124
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3125
          raise errors.OpExecError("could not add os for instance %s"
3126
                                   " on node %s" %
3127
                                   (instance, pnode_name))
3128

    
3129
      elif self.op.mode == constants.INSTANCE_IMPORT:
3130
        feedback_fn("* running the instance OS import scripts...")
3131
        src_node = self.op.src_node
3132
        src_image = self.src_image
3133
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3134
                                                src_node, src_image):
3135
          raise errors.OpExecError("Could not import os for instance"
3136
                                   " %s on node %s" %
3137
                                   (instance, pnode_name))
3138
      else:
3139
        # also checked in the prereq part
3140
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3141
                                     % self.op.mode)
3142

    
3143
    if self.op.start:
3144
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3145
      feedback_fn("* starting instance...")
3146
      if not rpc.call_instance_start(pnode_name, iobj, None):
3147
        raise errors.OpExecError("Could not start instance")
3148

    
3149

    
3150
class LUConnectConsole(NoHooksLU):
3151
  """Connect to an instance's console.
3152

3153
  This is somewhat special in that it returns the command line that
3154
  you need to run on the master node in order to connect to the
3155
  console.
3156

3157
  """
3158
  _OP_REQP = ["instance_name"]
3159

    
3160
  def CheckPrereq(self):
3161
    """Check prerequisites.
3162

3163
    This checks that the instance is in the cluster.
3164

3165
    """
3166
    instance = self.cfg.GetInstanceInfo(
3167
      self.cfg.ExpandInstanceName(self.op.instance_name))
3168
    if instance is None:
3169
      raise errors.OpPrereqError("Instance '%s' not known" %
3170
                                 self.op.instance_name)
3171
    self.instance = instance
3172

    
3173
  def Exec(self, feedback_fn):
3174
    """Connect to the console of an instance
3175

3176
    """
3177
    instance = self.instance
3178
    node = instance.primary_node
3179

    
3180
    node_insts = rpc.call_instance_list([node])[node]
3181
    if node_insts is False:
3182
      raise errors.OpExecError("Can't connect to node %s." % node)
3183

    
3184
    if instance.name not in node_insts:
3185
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3186

    
3187
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3188

    
3189
    hyper = hypervisor.GetHypervisor()
3190
    console_cmd = hyper.GetShellCommandForConsole(instance)
3191

    
3192
    # build ssh cmdline
3193
    cmd = self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3194
    return cmd[0], cmd
3195

    
3196

    
3197
class LUAddMDDRBDComponent(LogicalUnit):
3198
  """Adda new mirror member to an instance's disk.
3199

3200
  """
3201
  HPATH = "mirror-add"
3202
  HTYPE = constants.HTYPE_INSTANCE
3203
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3204

    
3205
  def BuildHooksEnv(self):
3206
    """Build hooks env.
3207

3208
    This runs on the master, the primary and all the secondaries.
3209

3210
    """
3211
    env = {
3212
      "NEW_SECONDARY": self.op.remote_node,
3213
      "DISK_NAME": self.op.disk_name,
3214
      }
3215
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3216
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3217
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3218
    return env, nl, nl
3219

    
3220
  def CheckPrereq(self):
3221
    """Check prerequisites.
3222

3223
    This checks that the instance is in the cluster.
3224

3225
    """
3226
    instance = self.cfg.GetInstanceInfo(
3227
      self.cfg.ExpandInstanceName(self.op.instance_name))
3228
    if instance is None:
3229
      raise errors.OpPrereqError("Instance '%s' not known" %
3230
                                 self.op.instance_name)
3231
    self.instance = instance
3232

    
3233
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3234
    if remote_node is None:
3235
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3236
    self.remote_node = remote_node
3237

    
3238
    if remote_node == instance.primary_node:
3239
      raise errors.OpPrereqError("The specified node is the primary node of"
3240
                                 " the instance.")
3241

    
3242
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3243
      raise errors.OpPrereqError("Instance's disk layout is not"
3244
                                 " remote_raid1.")
3245
    for disk in instance.disks:
3246
      if disk.iv_name == self.op.disk_name:
3247
        break
3248
    else:
3249
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3250
                                 " instance." % self.op.disk_name)
3251
    if len(disk.children) > 1:
3252
      raise errors.OpPrereqError("The device already has two slave devices."
3253
                                 " This would create a 3-disk raid1 which we"
3254
                                 " don't allow.")
3255
    self.disk = disk
3256

    
3257
  def Exec(self, feedback_fn):
3258
    """Add the mirror component
3259

3260
    """
3261
    disk = self.disk
3262
    instance = self.instance
3263

    
3264
    remote_node = self.remote_node
3265
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3266
    names = _GenerateUniqueNames(self.cfg, lv_names)
3267
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3268
                                     remote_node, disk.size, names)
3269

    
3270
    logger.Info("adding new mirror component on secondary")
3271
    #HARDCODE
3272
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3273
                                      new_drbd, False,
3274
                                      _GetInstanceInfoText(instance)):
3275
      raise errors.OpExecError("Failed to create new component on secondary"
3276
                               " node %s" % remote_node)
3277

    
3278
    logger.Info("adding new mirror component on primary")
3279
    #HARDCODE
3280
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3281
                                    instance, new_drbd,
3282
                                    _GetInstanceInfoText(instance)):
3283
      # remove secondary dev
3284
      self.cfg.SetDiskID(new_drbd, remote_node)
3285
      rpc.call_blockdev_remove(remote_node, new_drbd)
3286
      raise errors.OpExecError("Failed to create volume on primary")
3287

    
3288
    # the device exists now
3289
    # call the primary node to add the mirror to md
3290
    logger.Info("adding new mirror component to md")
3291
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3292
                                         disk, [new_drbd]):
3293
      logger.Error("Can't add mirror compoment to md!")
3294
      self.cfg.SetDiskID(new_drbd, remote_node)
3295
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3296
        logger.Error("Can't rollback on secondary")
3297
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3298
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3299
        logger.Error("Can't rollback on primary")
3300
      raise errors.OpExecError("Can't add mirror component to md array")
3301

    
3302
    disk.children.append(new_drbd)
3303

    
3304
    self.cfg.AddInstance(instance)
3305

    
3306
    _WaitForSync(self.cfg, instance, self.proc)
3307

    
3308
    return 0
3309

    
3310

    
3311
class LURemoveMDDRBDComponent(LogicalUnit):
3312
  """Remove a component from a remote_raid1 disk.
3313

3314
  """
3315
  HPATH = "mirror-remove"
3316
  HTYPE = constants.HTYPE_INSTANCE
3317
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3318

    
3319
  def BuildHooksEnv(self):
3320
    """Build hooks env.
3321

3322
    This runs on the master, the primary and all the secondaries.
3323

3324
    """
3325
    env = {
3326
      "DISK_NAME": self.op.disk_name,
3327
      "DISK_ID": self.op.disk_id,
3328
      "OLD_SECONDARY": self.old_secondary,
3329
      }
3330
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3331
    nl = [self.sstore.GetMasterNode(),
3332
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3333
    return env, nl, nl
3334

    
3335
  def CheckPrereq(self):
3336
    """Check prerequisites.
3337

3338
    This checks that the instance is in the cluster.
3339

3340
    """
3341
    instance = self.cfg.GetInstanceInfo(
3342
      self.cfg.ExpandInstanceName(self.op.instance_name))
3343
    if instance is None:
3344
      raise errors.OpPrereqError("Instance '%s' not known" %
3345
                                 self.op.instance_name)
3346
    self.instance = instance
3347

    
3348
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3349
      raise errors.OpPrereqError("Instance's disk layout is not"
3350
                                 " remote_raid1.")
3351
    for disk in instance.disks:
3352
      if disk.iv_name == self.op.disk_name:
3353
        break
3354
    else:
3355
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3356
                                 " instance." % self.op.disk_name)
3357
    for child in disk.children:
3358
      if (child.dev_type == constants.LD_DRBD7 and
3359
          child.logical_id[2] == self.op.disk_id):
3360
        break
3361
    else:
3362
      raise errors.OpPrereqError("Can't find the device with this port.")
3363

    
3364
    if len(disk.children) < 2:
3365
      raise errors.OpPrereqError("Cannot remove the last component from"
3366
                                 " a mirror.")
3367
    self.disk = disk
3368
    self.child = child
3369
    if self.child.logical_id[0] == instance.primary_node:
3370
      oid = 1
3371
    else:
3372
      oid = 0
3373
    self.old_secondary = self.child.logical_id[oid]
3374

    
3375
  def Exec(self, feedback_fn):
3376
    """Remove the mirror component
3377

3378
    """
3379
    instance = self.instance
3380
    disk = self.disk
3381
    child = self.child
3382
    logger.Info("remove mirror component")
3383
    self.cfg.SetDiskID(disk, instance.primary_node)
3384
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3385
                                            disk, [child]):
3386
      raise errors.OpExecError("Can't remove child from mirror.")
3387

    
3388
    for node in child.logical_id[:2]:
3389
      self.cfg.SetDiskID(child, node)
3390
      if not rpc.call_blockdev_remove(node, child):
3391
        logger.Error("Warning: failed to remove device from node %s,"
3392
                     " continuing operation." % node)
3393

    
3394
    disk.children.remove(child)
3395
    self.cfg.AddInstance(instance)
3396

    
3397

    
3398
class LUReplaceDisks(LogicalUnit):
3399
  """Replace the disks of an instance.
3400

3401
  """
3402
  HPATH = "mirrors-replace"
3403
  HTYPE = constants.HTYPE_INSTANCE
3404
  _OP_REQP = ["instance_name", "mode", "disks"]
3405

    
3406
  def BuildHooksEnv(self):
3407
    """Build hooks env.
3408

3409
    This runs on the master, the primary and all the secondaries.
3410

3411
    """
3412
    env = {
3413
      "MODE": self.op.mode,
3414
      "NEW_SECONDARY": self.op.remote_node,
3415
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3416
      }
3417
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3418
    nl = [
3419
      self.sstore.GetMasterNode(),
3420
      self.instance.primary_node,
3421
      ]
3422
    if self.op.remote_node is not None:
3423
      nl.append(self.op.remote_node)
3424
    return env, nl, nl
3425

    
3426
  def CheckPrereq(self):
3427
    """Check prerequisites.
3428

3429
    This checks that the instance is in the cluster.
3430

3431
    """
3432
    instance = self.cfg.GetInstanceInfo(
3433
      self.cfg.ExpandInstanceName(self.op.instance_name))
3434
    if instance is None:
3435
      raise errors.OpPrereqError("Instance '%s' not known" %
3436
                                 self.op.instance_name)
3437
    self.instance = instance
3438
    self.op.instance_name = instance.name
3439

    
3440
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3441
      raise errors.OpPrereqError("Instance's disk layout is not"
3442
                                 " network mirrored.")
3443

    
3444
    if len(instance.secondary_nodes) != 1:
3445
      raise errors.OpPrereqError("The instance has a strange layout,"
3446
                                 " expected one secondary but found %d" %
3447
                                 len(instance.secondary_nodes))
3448

    
3449
    self.sec_node = instance.secondary_nodes[0]
3450

    
3451
    remote_node = getattr(self.op, "remote_node", None)
3452
    if remote_node is not None:
3453
      remote_node = self.cfg.ExpandNodeName(remote_node)
3454
      if remote_node is None:
3455
        raise errors.OpPrereqError("Node '%s' not known" %
3456
                                   self.op.remote_node)
3457
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3458
    else:
3459
      self.remote_node_info = None
3460
    if remote_node == instance.primary_node:
3461
      raise errors.OpPrereqError("The specified node is the primary node of"
3462
                                 " the instance.")
3463
    elif remote_node == self.sec_node:
3464
      if self.op.mode == constants.REPLACE_DISK_SEC:
3465
        # this is for DRBD8, where we can't execute the same mode of
3466
        # replacement as for drbd7 (no different port allocated)
3467
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3468
                                   " replacement")
3469
      # the user gave the current secondary, switch to
3470
      # 'no-replace-secondary' mode for drbd7
3471
      remote_node = None
3472
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3473
        self.op.mode != constants.REPLACE_DISK_ALL):
3474
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3475
                                 " disks replacement, not individual ones")
3476
    if instance.disk_template == constants.DT_DRBD8:
3477
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3478
          remote_node is not None):
3479
        # switch to replace secondary mode
3480
        self.op.mode = constants.REPLACE_DISK_SEC
3481

    
3482
      if self.op.mode == constants.REPLACE_DISK_ALL:
3483
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3484
                                   " secondary disk replacement, not"
3485
                                   " both at once")
3486
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3487
        if remote_node is not None:
3488
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3489
                                     " the secondary while doing a primary"
3490
                                     " node disk replacement")
3491
        self.tgt_node = instance.primary_node
3492
        self.oth_node = instance.secondary_nodes[0]
3493
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3494
        self.new_node = remote_node # this can be None, in which case
3495
                                    # we don't change the secondary
3496
        self.tgt_node = instance.secondary_nodes[0]
3497
        self.oth_node = instance.primary_node
3498
      else:
3499
        raise errors.ProgrammerError("Unhandled disk replace mode")
3500

    
3501
    for name in self.op.disks:
3502
      if instance.FindDisk(name) is None:
3503
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3504
                                   (name, instance.name))
3505
    self.op.remote_node = remote_node
3506

    
3507
  def _ExecRR1(self, feedback_fn):
3508
    """Replace the disks of an instance.
3509

3510
    """
3511
    instance = self.instance
3512
    iv_names = {}
3513
    # start of work
3514
    if self.op.remote_node is None:
3515
      remote_node = self.sec_node
3516
    else:
3517
      remote_node = self.op.remote_node
3518
    cfg = self.cfg
3519
    for dev in instance.disks:
3520
      size = dev.size
3521
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3522
      names = _GenerateUniqueNames(cfg, lv_names)
3523
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3524
                                       remote_node, size, names)
3525
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3526
      logger.Info("adding new mirror component on secondary for %s" %
3527
                  dev.iv_name)
3528
      #HARDCODE
3529
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3530
                                        new_drbd, False,
3531
                                        _GetInstanceInfoText(instance)):
3532
        raise errors.OpExecError("Failed to create new component on secondary"
3533
                                 " node %s. Full abort, cleanup manually!" %
3534
                                 remote_node)
3535

    
3536
      logger.Info("adding new mirror component on primary")
3537
      #HARDCODE
3538
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3539
                                      instance, new_drbd,
3540
                                      _GetInstanceInfoText(instance)):
3541
        # remove secondary dev
3542
        cfg.SetDiskID(new_drbd, remote_node)
3543
        rpc.call_blockdev_remove(remote_node, new_drbd)
3544
        raise errors.OpExecError("Failed to create volume on primary!"
3545
                                 " Full abort, cleanup manually!!")
3546

    
3547
      # the device exists now
3548
      # call the primary node to add the mirror to md
3549
      logger.Info("adding new mirror component to md")
3550
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3551
                                           [new_drbd]):
3552
        logger.Error("Can't add mirror compoment to md!")
3553
        cfg.SetDiskID(new_drbd, remote_node)
3554
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3555
          logger.Error("Can't rollback on secondary")
3556
        cfg.SetDiskID(new_drbd, instance.primary_node)
3557
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3558
          logger.Error("Can't rollback on primary")
3559
        raise errors.OpExecError("Full abort, cleanup manually!!")
3560

    
3561
      dev.children.append(new_drbd)
3562
      cfg.AddInstance(instance)
3563

    
3564
    # this can fail as the old devices are degraded and _WaitForSync
3565
    # does a combined result over all disks, so we don't check its
3566
    # return value
3567
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3568

    
3569
    # so check manually all the devices
3570
    for name in iv_names:
3571
      dev, child, new_drbd = iv_names[name]
3572
      cfg.SetDiskID(dev, instance.primary_node)
3573
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3574
      if is_degr:
3575
        raise errors.OpExecError("MD device %s is degraded!" % name)
3576
      cfg.SetDiskID(new_drbd, instance.primary_node)
3577
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3578
      if is_degr:
3579
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3580

    
3581
    for name in iv_names:
3582
      dev, child, new_drbd = iv_names[name]
3583
      logger.Info("remove mirror %s component" % name)
3584
      cfg.SetDiskID(dev, instance.primary_node)
3585
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3586
                                              dev, [child]):
3587
        logger.Error("Can't remove child from mirror, aborting"
3588
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3589
        continue
3590

    
3591
      for node in child.logical_id[:2]:
3592
        logger.Info("remove child device on %s" % node)
3593
        cfg.SetDiskID(child, node)
3594
        if not rpc.call_blockdev_remove(node, child):
3595
          logger.Error("Warning: failed to remove device from node %s,"
3596
                       " continuing operation." % node)
3597

    
3598
      dev.children.remove(child)
3599

    
3600
      cfg.AddInstance(instance)
3601

    
3602
  def _ExecD8DiskOnly(self, feedback_fn):
3603
    """Replace a disk on the primary or secondary for dbrd8.
3604

3605
    The algorithm for replace is quite complicated:
3606
      - for each disk to be replaced:
3607
        - create new LVs on the target node with unique names
3608
        - detach old LVs from the drbd device
3609
        - rename old LVs to name_replaced.<time_t>
3610
        - rename new LVs to old LVs
3611
        - attach the new LVs (with the old names now) to the drbd device
3612
      - wait for sync across all devices
3613
      - for each modified disk:
3614
        - remove old LVs (which have the name name_replaces.<time_t>)
3615

3616
    Failures are not very well handled.
3617

3618
    """
3619
    steps_total = 6
3620
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3621
    instance = self.instance
3622
    iv_names = {}
3623
    vgname = self.cfg.GetVGName()
3624
    # start of work
3625
    cfg = self.cfg
3626
    tgt_node = self.tgt_node
3627
    oth_node = self.oth_node
3628

    
3629
    # Step: check device activation
3630
    self.proc.LogStep(1, steps_total, "check device existence")
3631
    info("checking volume groups")
3632
    my_vg = cfg.GetVGName()
3633
    results = rpc.call_vg_list([oth_node, tgt_node])
3634
    if not results:
3635
      raise errors.OpExecError("Can't list volume groups on the nodes")
3636
    for node in oth_node, tgt_node:
3637
      res = results.get(node, False)
3638
      if not res or my_vg not in res:
3639
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3640
                                 (my_vg, node))
3641
    for dev in instance.disks:
3642
      if not dev.iv_name in self.op.disks:
3643
        continue
3644
      for node in tgt_node, oth_node:
3645
        info("checking %s on %s" % (dev.iv_name, node))
3646
        cfg.SetDiskID(dev, node)
3647
        if not rpc.call_blockdev_find(node, dev):
3648
          raise errors.OpExecError("Can't find device %s on node %s" %
3649
                                   (dev.iv_name, node))
3650

    
3651
    # Step: check other node consistency
3652
    self.proc.LogStep(2, steps_total, "check peer consistency")
3653
    for dev in instance.disks:
3654
      if not dev.iv_name in self.op.disks:
3655
        continue
3656
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3657
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3658
                                   oth_node==instance.primary_node):
3659
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3660
                                 " to replace disks on this node (%s)" %
3661
                                 (oth_node, tgt_node))
3662

    
3663
    # Step: create new storage
3664
    self.proc.LogStep(3, steps_total, "allocate new storage")
3665
    for dev in instance.disks:
3666
      if not dev.iv_name in self.op.disks:
3667
        continue
3668
      size = dev.size
3669
      cfg.SetDiskID(dev, tgt_node)
3670
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3671
      names = _GenerateUniqueNames(cfg, lv_names)
3672
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3673
                             logical_id=(vgname, names[0]))
3674
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3675
                             logical_id=(vgname, names[1]))
3676
      new_lvs = [lv_data, lv_meta]
3677
      old_lvs = dev.children
3678
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3679
      info("creating new local storage on %s for %s" %
3680
           (tgt_node, dev.iv_name))
3681
      # since we *always* want to create this LV, we use the
3682
      # _Create...OnPrimary (which forces the creation), even if we
3683
      # are talking about the secondary node
3684
      for new_lv in new_lvs:
3685
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3686
                                        _GetInstanceInfoText(instance)):
3687
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3688
                                   " node '%s'" %
3689
                                   (new_lv.logical_id[1], tgt_node))
3690

    
3691
    # Step: for each lv, detach+rename*2+attach
3692
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3693
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3694
      info("detaching %s drbd from local storage" % dev.iv_name)
3695
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3696
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3697
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3698
      #dev.children = []
3699
      #cfg.Update(instance)
3700

    
3701
      # ok, we created the new LVs, so now we know we have the needed
3702
      # storage; as such, we proceed on the target node to rename
3703
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3704
      # using the assumption that logical_id == physical_id (which in
3705
      # turn is the unique_id on that node)
3706

    
3707
      # FIXME(iustin): use a better name for the replaced LVs
3708
      temp_suffix = int(time.time())
3709
      ren_fn = lambda d, suff: (d.physical_id[0],
3710
                                d.physical_id[1] + "_replaced-%s" % suff)
3711
      # build the rename list based on what LVs exist on the node
3712
      rlist = []
3713
      for to_ren in old_lvs:
3714
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3715
        if find_res is not None: # device exists
3716
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3717

    
3718
      info("renaming the old LVs on the target node")
3719
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3720
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3721
      # now we rename the new LVs to the old LVs
3722
      info("renaming the new LVs on the target node")
3723
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3724
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3725
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3726

    
3727
      for old, new in zip(old_lvs, new_lvs):
3728
        new.logical_id = old.logical_id
3729
        cfg.SetDiskID(new, tgt_node)
3730

    
3731
      for disk in old_lvs:
3732
        disk.logical_id = ren_fn(disk, temp_suffix)
3733
        cfg.SetDiskID(disk, tgt_node)
3734

    
3735
      # now that the new lvs have the old name, we can add them to the device
3736
      info("adding new mirror component on %s" % tgt_node)
3737
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3738
        for new_lv in new_lvs:
3739
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3740
            warning("Can't rollback device %s", hint="manually cleanup unused"
3741
                    " logical volumes")
3742
        raise errors.OpExecError("Can't add local storage to drbd")
3743

    
3744
      dev.children = new_lvs
3745
      cfg.Update(instance)
3746

    
3747
    # Step: wait for sync
3748

    
3749
    # this can fail as the old devices are degraded and _WaitForSync
3750
    # does a combined result over all disks, so we don't check its
3751
    # return value
3752
    self.proc.LogStep(5, steps_total, "sync devices")
3753
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3754

    
3755
    # so check manually all the devices
3756
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3757
      cfg.SetDiskID(dev, instance.primary_node)
3758
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3759
      if is_degr:
3760
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3761

    
3762
    # Step: remove old storage
3763
    self.proc.LogStep(6, steps_total, "removing old storage")
3764
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3765
      info("remove logical volumes for %s" % name)
3766
      for lv in old_lvs:
3767
        cfg.SetDiskID(lv, tgt_node)
3768
        if not rpc.call_blockdev_remove(tgt_node, lv):
3769
          warning("Can't remove old LV", hint="manually remove unused LVs")
3770
          continue
3771

    
3772
  def _ExecD8Secondary(self, feedback_fn):
3773
    """Replace the secondary node for drbd8.
3774

3775
    The algorithm for replace is quite complicated:
3776
      - for all disks of the instance:
3777
        - create new LVs on the new node with same names
3778
        - shutdown the drbd device on the old secondary
3779
        - disconnect the drbd network on the primary
3780
        - create the drbd device on the new secondary
3781
        - network attach the drbd on the primary, using an artifice:
3782
          the drbd code for Attach() will connect to the network if it
3783
          finds a device which is connected to the good local disks but
3784
          not network enabled
3785
      - wait for sync across all devices
3786
      - remove all disks from the old secondary
3787

3788
    Failures are not very well handled.
3789

3790
    """
3791
    steps_total = 6
3792
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3793
    instance = self.instance
3794
    iv_names = {}
3795
    vgname = self.cfg.GetVGName()
3796
    # start of work
3797
    cfg = self.cfg
3798
    old_node = self.tgt_node
3799
    new_node = self.new_node
3800
    pri_node = instance.primary_node
3801

    
3802
    # Step: check device activation
3803
    self.proc.LogStep(1, steps_total, "check device existence")
3804
    info("checking volume groups")
3805
    my_vg = cfg.GetVGName()
3806
    results = rpc.call_vg_list([pri_node, new_node])
3807
    if not results:
3808
      raise errors.OpExecError("Can't list volume groups on the nodes")
3809
    for node in pri_node, new_node:
3810
      res = results.get(node, False)
3811
      if not res or my_vg not in res:
3812
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3813
                                 (my_vg, node))
3814
    for dev in instance.disks:
3815
      if not dev.iv_name in self.op.disks:
3816
        continue
3817
      info("checking %s on %s" % (dev.iv_name, pri_node))
3818
      cfg.SetDiskID(dev, pri_node)
3819
      if not rpc.call_blockdev_find(pri_node, dev):
3820
        raise errors.OpExecError("Can't find device %s on node %s" %
3821
                                 (dev.iv_name, pri_node))
3822

    
3823
    # Step: check other node consistency
3824
    self.proc.LogStep(2, steps_total, "check peer consistency")
3825
    for dev in instance.disks:
3826
      if not dev.iv_name in self.op.disks:
3827
        continue
3828
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3829
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3830
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3831
                                 " unsafe to replace the secondary" %
3832
                                 pri_node)
3833

    
3834
    # Step: create new storage
3835
    self.proc.LogStep(3, steps_total, "allocate new storage")
3836
    for dev in instance.disks:
3837
      size = dev.size
3838
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3839
      # since we *always* want to create this LV, we use the
3840
      # _Create...OnPrimary (which forces the creation), even if we
3841
      # are talking about the secondary node
3842
      for new_lv in dev.children:
3843
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3844
                                        _GetInstanceInfoText(instance)):
3845
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3846
                                   " node '%s'" %
3847
                                   (new_lv.logical_id[1], new_node))
3848

    
3849
      iv_names[dev.iv_name] = (dev, dev.children)
3850

    
3851
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3852
    for dev in instance.disks:
3853
      size = dev.size
3854
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3855
      # create new devices on new_node
3856
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3857
                              logical_id=(pri_node, new_node,
3858
                                          dev.logical_id[2]),
3859
                              children=dev.children)
3860
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3861
                                        new_drbd, False,
3862
                                      _GetInstanceInfoText(instance)):
3863
        raise errors.OpExecError("Failed to create new DRBD on"
3864
                                 " node '%s'" % new_node)
3865

    
3866
    for dev in instance.disks:
3867
      # we have new devices, shutdown the drbd on the old secondary
3868
      info("shutting down drbd for %s on old node" % dev.iv_name)
3869
      cfg.SetDiskID(dev, old_node)
3870
      if not rpc.call_blockdev_shutdown(old_node, dev):
3871
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3872
                hint="Please cleanup this device manually as soon as possible")
3873

    
3874
    info("detaching primary drbds from the network (=> standalone)")
3875
    done = 0
3876
    for dev in instance.disks:
3877
      cfg.SetDiskID(dev, pri_node)
3878
      # set the physical (unique in bdev terms) id to None, meaning
3879
      # detach from network
3880
      dev.physical_id = (None,) * len(dev.physical_id)
3881
      # and 'find' the device, which will 'fix' it to match the
3882
      # standalone state
3883
      if rpc.call_blockdev_find(pri_node, dev):
3884
        done += 1
3885
      else:
3886
        warning("Failed to detach drbd %s from network, unusual case" %
3887
                dev.iv_name)
3888

    
3889
    if not done:
3890
      # no detaches succeeded (very unlikely)
3891
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3892

    
3893
    # if we managed to detach at least one, we update all the disks of
3894
    # the instance to point to the new secondary
3895
    info("updating instance configuration")
3896
    for dev in instance.disks:
3897
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3898
      cfg.SetDiskID(dev, pri_node)
3899
    cfg.Update(instance)
3900

    
3901
    # and now perform the drbd attach
3902
    info("attaching primary drbds to new secondary (standalone => connected)")
3903
    failures = []
3904
    for dev in instance.disks:
3905
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3906
      # since the attach is smart, it's enough to 'find' the device,
3907
      # it will automatically activate the network, if the physical_id
3908
      # is correct
3909
      cfg.SetDiskID(dev, pri_node)
3910
      if not rpc.call_blockdev_find(pri_node, dev):
3911
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3912
                "please do a gnt-instance info to see the status of disks")
3913

    
3914
    # this can fail as the old devices are degraded and _WaitForSync
3915
    # does a combined result over all disks, so we don't check its
3916
    # return value
3917
    self.proc.LogStep(5, steps_total, "sync devices")
3918
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3919

    
3920
    # so check manually all the devices
3921
    for name, (dev, old_lvs) in iv_names.iteritems():
3922
      cfg.SetDiskID(dev, pri_node)
3923
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3924
      if is_degr:
3925
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3926

    
3927
    self.proc.LogStep(6, steps_total, "removing old storage")
3928
    for name, (dev, old_lvs) in iv_names.iteritems():
3929
      info("remove logical volumes for %s" % name)
3930
      for lv in old_lvs:
3931
        cfg.SetDiskID(lv, old_node)
3932
        if not rpc.call_blockdev_remove(old_node, lv):
3933
          warning("Can't remove LV on old secondary",
3934
                  hint="Cleanup stale volumes by hand")
3935

    
3936
  def Exec(self, feedback_fn):
3937
    """Execute disk replacement.
3938

3939
    This dispatches the disk replacement to the appropriate handler.
3940

3941
    """
3942
    instance = self.instance
3943
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3944
      fn = self._ExecRR1
3945
    elif instance.disk_template == constants.DT_DRBD8:
3946
      if self.op.remote_node is None:
3947
        fn = self._ExecD8DiskOnly
3948
      else:
3949
        fn = self._ExecD8Secondary
3950
    else:
3951
      raise errors.ProgrammerError("Unhandled disk replacement case")
3952
    return fn(feedback_fn)
3953

    
3954

    
3955
class LUQueryInstanceData(NoHooksLU):
3956
  """Query runtime instance data.
3957

3958
  """
3959
  _OP_REQP = ["instances"]
3960

    
3961
  def CheckPrereq(self):
3962
    """Check prerequisites.
3963

3964
    This only checks the optional instance list against the existing names.
3965

3966
    """
3967
    if not isinstance(self.op.instances, list):
3968
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3969
    if self.op.instances:
3970
      self.wanted_instances = []
3971
      names = self.op.instances
3972
      for name in names:
3973
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3974
        if instance is None:
3975
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3976
        self.wanted_instances.append(instance)
3977
    else:
3978
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3979
                               in self.cfg.GetInstanceList()]
3980
    return
3981

    
3982

    
3983
  def _ComputeDiskStatus(self, instance, snode, dev):
3984
    """Compute block device status.
3985

3986
    """
3987
    self.cfg.SetDiskID(dev, instance.primary_node)
3988
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3989
    if dev.dev_type in constants.LDS_DRBD:
3990
      # we change the snode then (otherwise we use the one passed in)
3991
      if dev.logical_id[0] == instance.primary_node:
3992
        snode = dev.logical_id[1]
3993
      else:
3994
        snode = dev.logical_id[0]
3995

    
3996
    if snode:
3997
      self.cfg.SetDiskID(dev, snode)
3998
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3999
    else:
4000
      dev_sstatus = None
4001

    
4002
    if dev.children:
4003
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4004
                      for child in dev.children]
4005
    else:
4006
      dev_children = []
4007

    
4008
    data = {
4009
      "iv_name": dev.iv_name,
4010
      "dev_type": dev.dev_type,
4011
      "logical_id": dev.logical_id,
4012
      "physical_id": dev.physical_id,
4013
      "pstatus": dev_pstatus,
4014
      "sstatus": dev_sstatus,
4015
      "children": dev_children,
4016
      }
4017

    
4018
    return data
4019

    
4020
  def Exec(self, feedback_fn):
4021
    """Gather and return data"""
4022
    result = {}
4023
    for instance in self.wanted_instances:
4024
      remote_info = rpc.call_instance_info(instance.primary_node,
4025
                                                instance.name)
4026
      if remote_info and "state" in remote_info:
4027
        remote_state = "up"
4028
      else:
4029
        remote_state = "down"
4030
      if instance.status == "down":
4031
        config_state = "down"
4032
      else:
4033
        config_state = "up"
4034

    
4035
      disks = [self._ComputeDiskStatus(instance, None, device)
4036
               for device in instance.disks]
4037

    
4038
      idict = {
4039
        "name": instance.name,
4040
        "config_state": config_state,
4041
        "run_state": remote_state,
4042
        "pnode": instance.primary_node,
4043
        "snodes": instance.secondary_nodes,
4044
        "os": instance.os,
4045
        "memory": instance.memory,
4046
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4047
        "disks": disks,
4048
        "network_port": instance.network_port,
4049
        "vcpus": instance.vcpus,
4050
        "kernel_path": instance.kernel_path,
4051
        "initrd_path": instance.initrd_path,
4052
        "hvm_boot_order": instance.hvm_boot_order,
4053
        }
4054

    
4055
      result[instance.name] = idict
4056

    
4057
    return result
4058

    
4059

    
4060
class LUSetInstanceParms(LogicalUnit):
4061
  """Modifies an instances's parameters.
4062

4063
  """
4064
  HPATH = "instance-modify"
4065
  HTYPE = constants.HTYPE_INSTANCE
4066
  _OP_REQP = ["instance_name"]
4067

    
4068
  def BuildHooksEnv(self):
4069
    """Build hooks env.
4070

4071
    This runs on the master, primary and secondaries.
4072

4073
    """
4074
    args = dict()
4075
    if self.mem:
4076
      args['memory'] = self.mem
4077
    if self.vcpus:
4078
      args['vcpus'] = self.vcpus
4079
    if self.do_ip or self.do_bridge or self.mac:
4080
      if self.do_ip:
4081
        ip = self.ip
4082
      else:
4083
        ip = self.instance.nics[0].ip
4084
      if self.bridge:
4085
        bridge = self.bridge
4086
      else:
4087
        bridge = self.instance.nics[0].bridge
4088
      if self.mac:
4089
        mac = self.mac
4090
      else:
4091
        mac = self.instance.nics[0].mac
4092
      args['nics'] = [(ip, bridge, mac)]
4093
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4094
    nl = [self.sstore.GetMasterNode(),
4095
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4096
    return env, nl, nl
4097

    
4098
  def CheckPrereq(self):
4099
    """Check prerequisites.
4100

4101
    This only checks the instance list against the existing names.
4102

4103
    """
4104
    self.mem = getattr(self.op, "mem", None)
4105
    self.vcpus = getattr(self.op, "vcpus", None)
4106
    self.ip = getattr(self.op, "ip", None)
4107
    self.mac = getattr(self.op, "mac", None)
4108
    self.bridge = getattr(self.op, "bridge", None)
4109
    self.kernel_path = getattr(self.op, "kernel_path", None)
4110
    self.initrd_path = getattr(self.op, "initrd_path", None)
4111
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4112
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4113
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4114
    if all_parms.count(None) == len(all_parms):
4115
      raise errors.OpPrereqError("No changes submitted")
4116
    if self.mem is not None:
4117
      try:
4118
        self.mem = int(self.mem)
4119
      except ValueError, err:
4120
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4121
    if self.vcpus is not None:
4122
      try:
4123
        self.vcpus = int(self.vcpus)
4124
      except ValueError, err:
4125
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4126
    if self.ip is not None:
4127
      self.do_ip = True
4128
      if self.ip.lower() == "none":
4129
        self.ip = None
4130
      else:
4131
        if not utils.IsValidIP(self.ip):
4132
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4133
    else:
4134
      self.do_ip = False
4135
    self.do_bridge = (self.bridge is not None)
4136
    if self.mac is not None:
4137
      if self.cfg.IsMacInUse(self.mac):
4138
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4139
                                   self.mac)
4140
      if not utils.IsValidMac(self.mac):
4141
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4142

    
4143
    if self.kernel_path is not None:
4144
      self.do_kernel_path = True
4145
      if self.kernel_path == constants.VALUE_NONE:
4146
        raise errors.OpPrereqError("Can't set instance to no kernel")
4147

    
4148
      if self.kernel_path != constants.VALUE_DEFAULT:
4149
        if not os.path.isabs(self.kernel_path):
4150
          raise errors.OpPrereqError("The kernel path must be an absolute"
4151
                                    " filename")
4152
    else:
4153
      self.do_kernel_path = False
4154

    
4155
    if self.initrd_path is not None:
4156
      self.do_initrd_path = True
4157
      if self.initrd_path not in (constants.VALUE_NONE,
4158
                                  constants.VALUE_DEFAULT):
4159
        if not os.path.isabs(self.initrd_path):
4160
          raise errors.OpPrereqError("The initrd path must be an absolute"
4161
                                    " filename")
4162
    else:
4163
      self.do_initrd_path = False
4164

    
4165
    # boot order verification
4166
    if self.hvm_boot_order is not None:
4167
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4168
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4169
          raise errors.OpPrereqError("invalid boot order specified,"
4170
                                     " must be one or more of [acdn]"
4171
                                     " or 'default'")
4172

    
4173
    instance = self.cfg.GetInstanceInfo(
4174
      self.cfg.ExpandInstanceName(self.op.instance_name))
4175
    if instance is None:
4176
      raise errors.OpPrereqError("No such instance name '%s'" %
4177
                                 self.op.instance_name)
4178
    self.op.instance_name = instance.name
4179
    self.instance = instance
4180
    return
4181

    
4182
  def Exec(self, feedback_fn):
4183
    """Modifies an instance.
4184

4185
    All parameters take effect only at the next restart of the instance.
4186
    """
4187
    result = []
4188
    instance = self.instance
4189
    if self.mem:
4190
      instance.memory = self.mem
4191
      result.append(("mem", self.mem))
4192
    if self.vcpus:
4193
      instance.vcpus = self.vcpus
4194
      result.append(("vcpus",  self.vcpus))
4195
    if self.do_ip:
4196
      instance.nics[0].ip = self.ip
4197
      result.append(("ip", self.ip))
4198
    if self.bridge:
4199
      instance.nics[0].bridge = self.bridge
4200
      result.append(("bridge", self.bridge))
4201
    if self.mac:
4202
      instance.nics[0].mac = self.mac
4203
      result.append(("mac", self.mac))
4204
    if self.do_kernel_path:
4205
      instance.kernel_path = self.kernel_path
4206
      result.append(("kernel_path", self.kernel_path))
4207
    if self.do_initrd_path:
4208
      instance.initrd_path = self.initrd_path
4209
      result.append(("initrd_path", self.initrd_path))
4210
    if self.hvm_boot_order:
4211
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4212
        instance.hvm_boot_order = None
4213
      else:
4214
        instance.hvm_boot_order = self.hvm_boot_order
4215
      result.append(("hvm_boot_order", self.hvm_boot_order))
4216

    
4217
    self.cfg.AddInstance(instance)
4218

    
4219
    return result
4220

    
4221

    
4222
class LUQueryExports(NoHooksLU):
4223
  """Query the exports list
4224

4225
  """
4226
  _OP_REQP = []
4227

    
4228
  def CheckPrereq(self):
4229
    """Check that the nodelist contains only existing nodes.
4230

4231
    """
4232
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4233

    
4234
  def Exec(self, feedback_fn):
4235
    """Compute the list of all the exported system images.
4236

4237
    Returns:
4238
      a dictionary with the structure node->(export-list)
4239
      where export-list is a list of the instances exported on
4240
      that node.
4241

4242
    """
4243
    return rpc.call_export_list(self.nodes)
4244

    
4245

    
4246
class LUExportInstance(LogicalUnit):
4247
  """Export an instance to an image in the cluster.
4248

4249
  """
4250
  HPATH = "instance-export"
4251
  HTYPE = constants.HTYPE_INSTANCE
4252
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4253

    
4254
  def BuildHooksEnv(self):
4255
    """Build hooks env.
4256

4257
    This will run on the master, primary node and target node.
4258

4259
    """
4260
    env = {
4261
      "EXPORT_NODE": self.op.target_node,
4262
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4263
      }
4264
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4265
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4266
          self.op.target_node]
4267
    return env, nl, nl
4268

    
4269
  def CheckPrereq(self):
4270
    """Check prerequisites.
4271

4272
    This checks that the instance name is a valid one.
4273

4274
    """
4275
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4276
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4277
    if self.instance is None:
4278
      raise errors.OpPrereqError("Instance '%s' not found" %
4279
                                 self.op.instance_name)
4280

    
4281
    # node verification
4282
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4283
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4284

    
4285
    if self.dst_node is None:
4286
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4287
                                 self.op.target_node)
4288
    self.op.target_node = self.dst_node.name
4289

    
4290
  def Exec(self, feedback_fn):
4291
    """Export an instance to an image in the cluster.
4292

4293
    """
4294
    instance = self.instance
4295
    dst_node = self.dst_node
4296
    src_node = instance.primary_node
4297
    # shutdown the instance, unless requested not to do so
4298
    if self.op.shutdown:
4299
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4300
      self.proc.ChainOpCode(op)
4301

    
4302
    vgname = self.cfg.GetVGName()
4303

    
4304
    snap_disks = []
4305

    
4306
    try:
4307
      for disk in instance.disks:
4308
        if disk.iv_name == "sda":
4309
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4310
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4311

    
4312
          if not new_dev_name:
4313
            logger.Error("could not snapshot block device %s on node %s" %
4314
                         (disk.logical_id[1], src_node))
4315
          else:
4316
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4317
                                      logical_id=(vgname, new_dev_name),
4318
                                      physical_id=(vgname, new_dev_name),
4319
                                      iv_name=disk.iv_name)
4320
            snap_disks.append(new_dev)
4321

    
4322
    finally:
4323
      if self.op.shutdown:
4324
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4325
                                       force=False)
4326
        self.proc.ChainOpCode(op)
4327

    
4328
    # TODO: check for size
4329

    
4330
    for dev in snap_disks:
4331
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4332
                                           instance):
4333
        logger.Error("could not export block device %s from node"
4334
                     " %s to node %s" %
4335
                     (dev.logical_id[1], src_node, dst_node.name))
4336
      if not rpc.call_blockdev_remove(src_node, dev):
4337
        logger.Error("could not remove snapshot block device %s from"
4338
                     " node %s" % (dev.logical_id[1], src_node))
4339

    
4340
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4341
      logger.Error("could not finalize export for instance %s on node %s" %
4342
                   (instance.name, dst_node.name))
4343

    
4344
    nodelist = self.cfg.GetNodeList()
4345
    nodelist.remove(dst_node.name)
4346

    
4347
    # on one-node clusters nodelist will be empty after the removal
4348
    # if we proceed the backup would be removed because OpQueryExports
4349
    # substitutes an empty list with the full cluster node list.
4350
    if nodelist:
4351
      op = opcodes.OpQueryExports(nodes=nodelist)
4352
      exportlist = self.proc.ChainOpCode(op)
4353
      for node in exportlist:
4354
        if instance.name in exportlist[node]:
4355
          if not rpc.call_export_remove(node, instance.name):
4356
            logger.Error("could not remove older export for instance %s"
4357
                         " on node %s" % (instance.name, node))
4358

    
4359

    
4360
class TagsLU(NoHooksLU):
4361
  """Generic tags LU.
4362

4363
  This is an abstract class which is the parent of all the other tags LUs.
4364

4365
  """
4366
  def CheckPrereq(self):
4367
    """Check prerequisites.
4368

4369
    """
4370
    if self.op.kind == constants.TAG_CLUSTER:
4371
      self.target = self.cfg.GetClusterInfo()
4372
    elif self.op.kind == constants.TAG_NODE:
4373
      name = self.cfg.ExpandNodeName(self.op.name)
4374
      if name is None:
4375
        raise errors.OpPrereqError("Invalid node name (%s)" %
4376
                                   (self.op.name,))
4377
      self.op.name = name
4378
      self.target = self.cfg.GetNodeInfo(name)
4379
    elif self.op.kind == constants.TAG_INSTANCE:
4380
      name = self.cfg.ExpandInstanceName(self.op.name)
4381
      if name is None:
4382
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4383
                                   (self.op.name,))
4384
      self.op.name = name
4385
      self.target = self.cfg.GetInstanceInfo(name)
4386
    else:
4387
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4388
                                 str(self.op.kind))
4389

    
4390

    
4391
class LUGetTags(TagsLU):
4392
  """Returns the tags of a given object.
4393

4394
  """
4395
  _OP_REQP = ["kind", "name"]
4396

    
4397
  def Exec(self, feedback_fn):
4398
    """Returns the tag list.
4399

4400
    """
4401
    return self.target.GetTags()
4402

    
4403

    
4404
class LUSearchTags(NoHooksLU):
4405
  """Searches the tags for a given pattern.
4406

4407
  """
4408
  _OP_REQP = ["pattern"]
4409

    
4410
  def CheckPrereq(self):
4411
    """Check prerequisites.
4412

4413
    This checks the pattern passed for validity by compiling it.
4414

4415
    """
4416
    try:
4417
      self.re = re.compile(self.op.pattern)
4418
    except re.error, err:
4419
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4420
                                 (self.op.pattern, err))
4421

    
4422
  def Exec(self, feedback_fn):
4423
    """Returns the tag list.
4424

4425
    """
4426
    cfg = self.cfg
4427
    tgts = [("/cluster", cfg.GetClusterInfo())]
4428
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4429
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4430
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4431
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4432
    results = []
4433
    for path, target in tgts:
4434
      for tag in target.GetTags():
4435
        if self.re.search(tag):
4436
          results.append((path, tag))
4437
    return results
4438

    
4439

    
4440
class LUAddTags(TagsLU):
4441
  """Sets a tag on a given object.
4442

4443
  """
4444
  _OP_REQP = ["kind", "name", "tags"]
4445

    
4446
  def CheckPrereq(self):
4447
    """Check prerequisites.
4448

4449
    This checks the type and length of the tag name and value.
4450

4451
    """
4452
    TagsLU.CheckPrereq(self)
4453
    for tag in self.op.tags:
4454
      objects.TaggableObject.ValidateTag(tag)
4455

    
4456
  def Exec(self, feedback_fn):
4457
    """Sets the tag.
4458

4459
    """
4460
    try:
4461
      for tag in self.op.tags:
4462
        self.target.AddTag(tag)
4463
    except errors.TagError, err:
4464
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4465
    try:
4466
      self.cfg.Update(self.target)
4467
    except errors.ConfigurationError:
4468
      raise errors.OpRetryError("There has been a modification to the"
4469
                                " config file and the operation has been"
4470
                                " aborted. Please retry.")
4471

    
4472

    
4473
class LUDelTags(TagsLU):
4474
  """Delete a list of tags from a given object.
4475

4476
  """
4477
  _OP_REQP = ["kind", "name", "tags"]
4478

    
4479
  def CheckPrereq(self):
4480
    """Check prerequisites.
4481

4482
    This checks that we have the given tag.
4483

4484
    """
4485
    TagsLU.CheckPrereq(self)
4486
    for tag in self.op.tags:
4487
      objects.TaggableObject.ValidateTag(tag)
4488
    del_tags = frozenset(self.op.tags)
4489
    cur_tags = self.target.GetTags()
4490
    if not del_tags <= cur_tags:
4491
      diff_tags = del_tags - cur_tags
4492
      diff_names = ["'%s'" % tag for tag in diff_tags]
4493
      diff_names.sort()
4494
      raise errors.OpPrereqError("Tag(s) %s not found" %
4495
                                 (",".join(diff_names)))
4496

    
4497
  def Exec(self, feedback_fn):
4498
    """Remove the tag from the object.
4499

4500
    """
4501
    for tag in self.op.tags:
4502
      self.target.RemoveTag(tag)
4503
    try:
4504
      self.cfg.Update(self.target)
4505
    except errors.ConfigurationError:
4506
      raise errors.OpRetryError("There has been a modification to the"
4507
                                " config file and the operation has been"
4508
                                " aborted. Please retry.")
4509

    
4510
class LUTestDelay(NoHooksLU):
4511
  """Sleep for a specified amount of time.
4512

4513
  This LU sleeps on the master and/or nodes for a specified amoutn of
4514
  time.
4515

4516
  """
4517
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4518

    
4519
  def CheckPrereq(self):
4520
    """Check prerequisites.
4521

4522
    This checks that we have a good list of nodes and/or the duration
4523
    is valid.
4524

4525
    """
4526

    
4527
    if self.op.on_nodes:
4528
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4529

    
4530
  def Exec(self, feedback_fn):
4531
    """Do the actual sleep.
4532

4533
    """
4534
    if self.op.on_master:
4535
      if not utils.TestDelay(self.op.duration):
4536
        raise errors.OpExecError("Error during master delay test")
4537
    if self.op.on_nodes:
4538
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4539
      if not result:
4540
        raise errors.OpExecError("Complete failure from rpc call")
4541
      for node, node_result in result.items():
4542
        if not node_result:
4543
          raise errors.OpExecError("Failure during rpc call to node %s,"
4544
                                   " result: %s" % (node, node_result))