Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b6023d6c

History | View | Annotate | Download (174 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    As for the node lists, the master should not be included in the
149
    them, as it will be added by the hooks runner in case this LU
150
    requires a cluster to run on (otherwise we don't have a node
151
    list). No nodes should be returned as an empty list (and not
152
    None).
153

154
    Note that if the HPATH for a LU class is None, this function will
155
    not be called.
156

157
    """
158
    raise NotImplementedError
159

    
160

    
161
class NoHooksLU(LogicalUnit):
162
  """Simple LU which runs no hooks.
163

164
  This LU is intended as a parent for other LogicalUnits which will
165
  run no hooks, in order to reduce duplicate code.
166

167
  """
168
  HPATH = None
169
  HTYPE = None
170

    
171
  def BuildHooksEnv(self):
172
    """Build hooks env.
173

174
    This is a no-op, since we don't run hooks.
175

176
    """
177
    return {}, [], []
178

    
179

    
180
def _AddHostToEtcHosts(hostname):
181
  """Wrapper around utils.SetEtcHostsEntry.
182

183
  """
184
  hi = utils.HostInfo(name=hostname)
185
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
186

    
187

    
188
def _RemoveHostFromEtcHosts(hostname):
189
  """Wrapper around utils.RemoveEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
194
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
195

    
196

    
197
def _GetWantedNodes(lu, nodes):
198
  """Returns list of checked and expanded node names.
199

200
  Args:
201
    nodes: List of nodes (strings) or None for all
202

203
  """
204
  if not isinstance(nodes, list):
205
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
206

    
207
  if nodes:
208
    wanted = []
209

    
210
    for name in nodes:
211
      node = lu.cfg.ExpandNodeName(name)
212
      if node is None:
213
        raise errors.OpPrereqError("No such node name '%s'" % name)
214
      wanted.append(node)
215

    
216
  else:
217
    wanted = lu.cfg.GetNodeList()
218
  return utils.NiceSort(wanted)
219

    
220

    
221
def _GetWantedInstances(lu, instances):
222
  """Returns list of checked and expanded instance names.
223

224
  Args:
225
    instances: List of instances (strings) or None for all
226

227
  """
228
  if not isinstance(instances, list):
229
    raise errors.OpPrereqError("Invalid argument type 'instances'")
230

    
231
  if instances:
232
    wanted = []
233

    
234
    for name in instances:
235
      instance = lu.cfg.ExpandInstanceName(name)
236
      if instance is None:
237
        raise errors.OpPrereqError("No such instance name '%s'" % name)
238
      wanted.append(instance)
239

    
240
  else:
241
    wanted = lu.cfg.GetInstanceList()
242
  return utils.NiceSort(wanted)
243

    
244

    
245
def _CheckOutputFields(static, dynamic, selected):
246
  """Checks whether all selected fields are valid.
247

248
  Args:
249
    static: Static fields
250
    dynamic: Dynamic fields
251

252
  """
253
  static_fields = frozenset(static)
254
  dynamic_fields = frozenset(dynamic)
255

    
256
  all_fields = static_fields | dynamic_fields
257

    
258
  if not all_fields.issuperset(selected):
259
    raise errors.OpPrereqError("Unknown output fields selected: %s"
260
                               % ",".join(frozenset(selected).
261
                                          difference(all_fields)))
262

    
263

    
264
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
265
                          memory, vcpus, nics):
266
  """Builds instance related env variables for hooks from single variables.
267

268
  Args:
269
    secondary_nodes: List of secondary nodes as strings
270
  """
271
  env = {
272
    "OP_TARGET": name,
273
    "INSTANCE_NAME": name,
274
    "INSTANCE_PRIMARY": primary_node,
275
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
276
    "INSTANCE_OS_TYPE": os_type,
277
    "INSTANCE_STATUS": status,
278
    "INSTANCE_MEMORY": memory,
279
    "INSTANCE_VCPUS": vcpus,
280
  }
281

    
282
  if nics:
283
    nic_count = len(nics)
284
    for idx, (ip, bridge, mac) in enumerate(nics):
285
      if ip is None:
286
        ip = ""
287
      env["INSTANCE_NIC%d_IP" % idx] = ip
288
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
289
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
290
  else:
291
    nic_count = 0
292

    
293
  env["INSTANCE_NIC_COUNT"] = nic_count
294

    
295
  return env
296

    
297

    
298
def _BuildInstanceHookEnvByObject(instance, override=None):
299
  """Builds instance related env variables for hooks from an object.
300

301
  Args:
302
    instance: objects.Instance object of instance
303
    override: dict of values to override
304
  """
305
  args = {
306
    'name': instance.name,
307
    'primary_node': instance.primary_node,
308
    'secondary_nodes': instance.secondary_nodes,
309
    'os_type': instance.os,
310
    'status': instance.os,
311
    'memory': instance.memory,
312
    'vcpus': instance.vcpus,
313
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
314
  }
315
  if override:
316
    args.update(override)
317
  return _BuildInstanceHookEnv(**args)
318

    
319

    
320
def _HasValidVG(vglist, vgname):
321
  """Checks if the volume group list is valid.
322

323
  A non-None return value means there's an error, and the return value
324
  is the error message.
325

326
  """
327
  vgsize = vglist.get(vgname, None)
328
  if vgsize is None:
329
    return "volume group '%s' missing" % vgname
330
  elif vgsize < 20480:
331
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
332
            (vgname, vgsize))
333
  return None
334

    
335

    
336
def _InitSSHSetup(node):
337
  """Setup the SSH configuration for the cluster.
338

339

340
  This generates a dsa keypair for root, adds the pub key to the
341
  permitted hosts and adds the hostkey to its own known hosts.
342

343
  Args:
344
    node: the name of this host as a fqdn
345

346
  """
347
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348

    
349
  for name in priv_key, pub_key:
350
    if os.path.exists(name):
351
      utils.CreateBackup(name)
352
    utils.RemoveFile(name)
353

    
354
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
355
                         "-f", priv_key,
356
                         "-q", "-N", ""])
357
  if result.failed:
358
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
359
                             result.output)
360

    
361
  f = open(pub_key, 'r')
362
  try:
363
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
364
  finally:
365
    f.close()
366

    
367

    
368
def _InitGanetiServerSetup(ss):
369
  """Setup the necessary configuration for the initial node daemon.
370

371
  This creates the nodepass file containing the shared password for
372
  the cluster and also generates the SSL certificate.
373

374
  """
375
  # Create pseudo random password
376
  randpass = sha.new(os.urandom(64)).hexdigest()
377
  # and write it into sstore
378
  ss.SetKey(ss.SS_NODED_PASS, randpass)
379

    
380
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
381
                         "-days", str(365*5), "-nodes", "-x509",
382
                         "-keyout", constants.SSL_CERT_FILE,
383
                         "-out", constants.SSL_CERT_FILE, "-batch"])
384
  if result.failed:
385
    raise errors.OpExecError("could not generate server ssl cert, command"
386
                             " %s had exitcode %s and error message %s" %
387
                             (result.cmd, result.exit_code, result.output))
388

    
389
  os.chmod(constants.SSL_CERT_FILE, 0400)
390

    
391
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
392

    
393
  if result.failed:
394
    raise errors.OpExecError("Could not start the node daemon, command %s"
395
                             " had exitcode %s and error %s" %
396
                             (result.cmd, result.exit_code, result.output))
397

    
398

    
399
def _CheckInstanceBridgesExist(instance):
400
  """Check that the brigdes needed by an instance exist.
401

402
  """
403
  # check bridges existance
404
  brlist = [nic.bridge for nic in instance.nics]
405
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
406
    raise errors.OpPrereqError("one or more target bridges %s does not"
407
                               " exist on destination node '%s'" %
408
                               (brlist, instance.primary_node))
409

    
410

    
411
class LUInitCluster(LogicalUnit):
412
  """Initialise the cluster.
413

414
  """
415
  HPATH = "cluster-init"
416
  HTYPE = constants.HTYPE_CLUSTER
417
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
418
              "def_bridge", "master_netdev", "file_storage_dir"]
419
  REQ_CLUSTER = False
420

    
421
  def BuildHooksEnv(self):
422
    """Build hooks env.
423

424
    Notes: Since we don't require a cluster, we must manually add
425
    ourselves in the post-run node list.
426

427
    """
428
    env = {"OP_TARGET": self.op.cluster_name}
429
    return env, [], [self.hostname.name]
430

    
431
  def CheckPrereq(self):
432
    """Verify that the passed name is a valid one.
433

434
    """
435
    if config.ConfigWriter.IsCluster():
436
      raise errors.OpPrereqError("Cluster is already initialised")
437

    
438
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
439
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
440
        raise errors.OpPrereqError("Please prepare the cluster VNC"
441
                                   "password file %s" %
442
                                   constants.VNC_PASSWORD_FILE)
443

    
444
    self.hostname = hostname = utils.HostInfo()
445

    
446
    if hostname.ip.startswith("127."):
447
      raise errors.OpPrereqError("This host's IP resolves to the private"
448
                                 " range (%s). Please fix DNS or %s." %
449
                                 (hostname.ip, constants.ETC_HOSTS))
450

    
451
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
452
                         source=constants.LOCALHOST_IP_ADDRESS):
453
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
454
                                 " to %s,\nbut this ip address does not"
455
                                 " belong to this host."
456
                                 " Aborting." % hostname.ip)
457

    
458
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459

    
460
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461
                     timeout=5):
462
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463

    
464
    secondary_ip = getattr(self.op, "secondary_ip", None)
465
    if secondary_ip and not utils.IsValidIP(secondary_ip):
466
      raise errors.OpPrereqError("Invalid secondary ip given")
467
    if (secondary_ip and
468
        secondary_ip != hostname.ip and
469
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
470
                           source=constants.LOCALHOST_IP_ADDRESS))):
471
      raise errors.OpPrereqError("You gave %s as secondary IP,"
472
                                 " but it does not belong to this host." %
473
                                 secondary_ip)
474
    self.secondary_ip = secondary_ip
475

    
476
    if not hasattr(self.op, "vg_name"):
477
      self.op.vg_name = None
478
    # if vg_name not None, checks if volume group is valid
479
    if self.op.vg_name:
480
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
481
      if vgstatus:
482
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
483
                                   " you are not using lvm" % vgstatus)
484

    
485
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
486

    
487
    if not os.path.isabs(self.op.file_storage_dir):
488
      raise errors.OpPrereqError("The file storage directory you have is"
489
                                 " not an absolute path.")
490

    
491
    if not os.path.exists(self.op.file_storage_dir):
492
      try:
493
        os.makedirs(self.op.file_storage_dir, 0750)
494
      except OSError, err:
495
        raise errors.OpPrereqError("Cannot create file storage directory"
496
                                   " '%s': %s" %
497
                                   (self.op.file_storage_dir, err))
498

    
499
    if not os.path.isdir(self.op.file_storage_dir):
500
      raise errors.OpPrereqError("The file storage directory '%s' is not"
501
                                 " a directory." % self.op.file_storage_dir)
502

    
503
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504
                    self.op.mac_prefix):
505
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
506
                                 self.op.mac_prefix)
507

    
508
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
509
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
510
                                 self.op.hypervisor_type)
511

    
512
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513
    if result.failed:
514
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
515
                                 (self.op.master_netdev,
516
                                  result.output.strip()))
517

    
518
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
519
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
520
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
521
                                 " executable." % constants.NODE_INITD_SCRIPT)
522

    
523
  def Exec(self, feedback_fn):
524
    """Initialize the cluster.
525

526
    """
527
    clustername = self.clustername
528
    hostname = self.hostname
529

    
530
    # set up the simple store
531
    self.sstore = ss = ssconf.SimpleStore()
532
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
533
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
534
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
535
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
536
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
537
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
538

    
539
    # set up the inter-node password and certificate
540
    _InitGanetiServerSetup(ss)
541

    
542
    # start the master ip
543
    rpc.call_node_start_master(hostname.name)
544

    
545
    # set up ssh config and /etc/hosts
546
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
547
    try:
548
      sshline = f.read()
549
    finally:
550
      f.close()
551
    sshkey = sshline.split(" ")[1]
552

    
553
    _AddHostToEtcHosts(hostname.name)
554
    _InitSSHSetup(hostname.name)
555

    
556
    # init of cluster config file
557
    self.cfg = cfgw = config.ConfigWriter()
558
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
559
                    sshkey, self.op.mac_prefix,
560
                    self.op.vg_name, self.op.def_bridge)
561

    
562
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
563

    
564

    
565
class LUDestroyCluster(NoHooksLU):
566
  """Logical unit for destroying the cluster.
567

568
  """
569
  _OP_REQP = []
570

    
571
  def CheckPrereq(self):
572
    """Check prerequisites.
573

574
    This checks whether the cluster is empty.
575

576
    Any errors are signalled by raising errors.OpPrereqError.
577

578
    """
579
    master = self.sstore.GetMasterNode()
580

    
581
    nodelist = self.cfg.GetNodeList()
582
    if len(nodelist) != 1 or nodelist[0] != master:
583
      raise errors.OpPrereqError("There are still %d node(s) in"
584
                                 " this cluster." % (len(nodelist) - 1))
585
    instancelist = self.cfg.GetInstanceList()
586
    if instancelist:
587
      raise errors.OpPrereqError("There are still %d instance(s) in"
588
                                 " this cluster." % len(instancelist))
589

    
590
  def Exec(self, feedback_fn):
591
    """Destroys the cluster.
592

593
    """
594
    master = self.sstore.GetMasterNode()
595
    if not rpc.call_node_stop_master(master):
596
      raise errors.OpExecError("Could not disable the master role")
597
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598
    utils.CreateBackup(priv_key)
599
    utils.CreateBackup(pub_key)
600
    rpc.call_node_leave_cluster(master)
601

    
602

    
603
class LUVerifyCluster(NoHooksLU):
604
  """Verifies the cluster status.
605

606
  """
607
  _OP_REQP = ["skip_checks"]
608

    
609
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
610
                  remote_version, feedback_fn):
611
    """Run multiple tests against a node.
612

613
    Test list:
614
      - compares ganeti version
615
      - checks vg existance and size > 20G
616
      - checks config file checksum
617
      - checks ssh to other nodes
618

619
    Args:
620
      node: name of the node to check
621
      file_list: required list of files
622
      local_cksum: dictionary of local files and their checksums
623

624
    """
625
    # compares ganeti version
626
    local_version = constants.PROTOCOL_VERSION
627
    if not remote_version:
628
      feedback_fn("  - ERROR: connection to %s failed" % (node))
629
      return True
630

    
631
    if local_version != remote_version:
632
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
633
                      (local_version, node, remote_version))
634
      return True
635

    
636
    # checks vg existance and size > 20G
637

    
638
    bad = False
639
    if not vglist:
640
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
641
                      (node,))
642
      bad = True
643
    else:
644
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
645
      if vgstatus:
646
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
647
        bad = True
648

    
649
    # checks config file checksum
650
    # checks ssh to any
651

    
652
    if 'filelist' not in node_result:
653
      bad = True
654
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
655
    else:
656
      remote_cksum = node_result['filelist']
657
      for file_name in file_list:
658
        if file_name not in remote_cksum:
659
          bad = True
660
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
661
        elif remote_cksum[file_name] != local_cksum[file_name]:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
664

    
665
    if 'nodelist' not in node_result:
666
      bad = True
667
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
668
    else:
669
      if node_result['nodelist']:
670
        bad = True
671
        for node in node_result['nodelist']:
672
          feedback_fn("  - ERROR: communication with node '%s': %s" %
673
                          (node, node_result['nodelist'][node]))
674
    hyp_result = node_result.get('hypervisor', None)
675
    if hyp_result is not None:
676
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
677
    return bad
678

    
679
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
680
                      node_instance, feedback_fn):
681
    """Verify an instance.
682

683
    This function checks to see if the required block devices are
684
    available on the instance's node.
685

686
    """
687
    bad = False
688

    
689
    node_current = instanceconfig.primary_node
690

    
691
    node_vol_should = {}
692
    instanceconfig.MapLVsByNode(node_vol_should)
693

    
694
    for node in node_vol_should:
695
      for volume in node_vol_should[node]:
696
        if node not in node_vol_is or volume not in node_vol_is[node]:
697
          feedback_fn("  - ERROR: volume %s missing on node %s" %
698
                          (volume, node))
699
          bad = True
700

    
701
    if not instanceconfig.status == 'down':
702
      if (node_current not in node_instance or
703
          not instance in node_instance[node_current]):
704
        feedback_fn("  - ERROR: instance %s not running on node %s" %
705
                        (instance, node_current))
706
        bad = True
707

    
708
    for node in node_instance:
709
      if (not node == node_current):
710
        if instance in node_instance[node]:
711
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
712
                          (instance, node))
713
          bad = True
714

    
715
    return bad
716

    
717
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
718
    """Verify if there are any unknown volumes in the cluster.
719

720
    The .os, .swap and backup volumes are ignored. All other volumes are
721
    reported as unknown.
722

723
    """
724
    bad = False
725

    
726
    for node in node_vol_is:
727
      for volume in node_vol_is[node]:
728
        if node not in node_vol_should or volume not in node_vol_should[node]:
729
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
730
                      (volume, node))
731
          bad = True
732
    return bad
733

    
734
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
735
    """Verify the list of running instances.
736

737
    This checks what instances are running but unknown to the cluster.
738

739
    """
740
    bad = False
741
    for node in node_instance:
742
      for runninginstance in node_instance[node]:
743
        if runninginstance not in instancelist:
744
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
745
                          (runninginstance, node))
746
          bad = True
747
    return bad
748

    
749
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
750
    """Verify N+1 Memory Resilience.
751

752
    Check that if one single node dies we can still start all the instances it
753
    was primary for.
754

755
    """
756
    bad = False
757

    
758
    for node, nodeinfo in node_info.iteritems():
759
      # This code checks that every node which is now listed as secondary has
760
      # enough memory to host all instances it is supposed to should a single
761
      # other node in the cluster fail.
762
      # FIXME: not ready for failover to an arbitrary node
763
      # FIXME: does not support file-backed instances
764
      # WARNING: we currently take into account down instances as well as up
765
      # ones, considering that even if they're down someone might want to start
766
      # them even in the event of a node failure.
767
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
768
        needed_mem = 0
769
        for instance in instances:
770
          needed_mem += instance_cfg[instance].memory
771
        if nodeinfo['mfree'] < needed_mem:
772
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
773
                      " failovers should node %s fail" % (node, prinode))
774
          bad = True
775
    return bad
776

    
777
  def CheckPrereq(self):
778
    """Check prerequisites.
779

780
    Transform the list of checks we're going to skip into a set and check that
781
    all its members are valid.
782

783
    """
784
    self.skip_set = frozenset(self.op.skip_checks)
785
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
786
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
787

    
788
  def Exec(self, feedback_fn):
789
    """Verify integrity of cluster, performing various test on nodes.
790

791
    """
792
    bad = False
793
    feedback_fn("* Verifying global settings")
794
    for msg in self.cfg.VerifyConfig():
795
      feedback_fn("  - ERROR: %s" % msg)
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
799
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
800
    i_non_redundant = [] # Non redundant instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = list(self.sstore.GetFileList())
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = rpc.call_instance_list(nodelist)
816
    all_vglist = rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': None,
821
      }
822
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
823
    all_rversion = rpc.call_version(nodelist)
824
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
825

    
826
    for node in nodelist:
827
      feedback_fn("* Verifying node %s" % node)
828
      result = self._VerifyNode(node, file_names, local_checksums,
829
                                all_vglist[node], all_nvinfo[node],
830
                                all_rversion[node], feedback_fn)
831
      bad = bad or result
832

    
833
      # node_volume
834
      volumeinfo = all_volumeinfo[node]
835

    
836
      if isinstance(volumeinfo, basestring):
837
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
838
                    (node, volumeinfo[-400:].encode('string_escape')))
839
        bad = True
840
        node_volume[node] = {}
841
      elif not isinstance(volumeinfo, dict):
842
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
843
        bad = True
844
        continue
845
      else:
846
        node_volume[node] = volumeinfo
847

    
848
      # node_instance
849
      nodeinstance = all_instanceinfo[node]
850
      if type(nodeinstance) != list:
851
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
852
        bad = True
853
        continue
854

    
855
      node_instance[node] = nodeinstance
856

    
857
      # node_info
858
      nodeinfo = all_ninfo[node]
859
      if not isinstance(nodeinfo, dict):
860
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
861
        bad = True
862
        continue
863

    
864
      try:
865
        node_info[node] = {
866
          "mfree": int(nodeinfo['memory_free']),
867
          "dfree": int(nodeinfo['vg_free']),
868
          "pinst": [],
869
          "sinst": [],
870
          # dictionary holding all instances this node is secondary for,
871
          # grouped by their primary node. Each key is a cluster node, and each
872
          # value is a list of instances which have the key as primary and the
873
          # current node as secondary.  this is handy to calculate N+1 memory
874
          # availability if you can only failover from a primary to its
875
          # secondary.
876
          "sinst-by-pnode": {},
877
        }
878
      except ValueError:
879
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
880
        bad = True
881
        continue
882

    
883
    node_vol_should = {}
884

    
885
    for instance in instancelist:
886
      feedback_fn("* Verifying instance %s" % instance)
887
      inst_config = self.cfg.GetInstanceInfo(instance)
888
      result =  self._VerifyInstance(instance, inst_config, node_volume,
889
                                     node_instance, feedback_fn)
890
      bad = bad or result
891

    
892
      inst_config.MapLVsByNode(node_vol_should)
893

    
894
      instance_cfg[instance] = inst_config
895

    
896
      pnode = inst_config.primary_node
897
      if pnode in node_info:
898
        node_info[pnode]['pinst'].append(instance)
899
      else:
900
        feedback_fn("  - ERROR: instance %s, connection to primary node"
901
                    " %s failed" % (instance, pnode))
902
        bad = True
903

    
904
      # If the instance is non-redundant we cannot survive losing its primary
905
      # node, so we are not N+1 compliant. On the other hand we have no disk
906
      # templates with more than one secondary so that situation is not well
907
      # supported either.
908
      # FIXME: does not support file-backed instances
909
      if len(inst_config.secondary_nodes) == 0:
910
        i_non_redundant.append(instance)
911
      elif len(inst_config.secondary_nodes) > 1:
912
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
913
                    % instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    return int(bad)
946

    
947

    
948
class LUVerifyDisks(NoHooksLU):
949
  """Verifies the cluster disks status.
950

951
  """
952
  _OP_REQP = []
953

    
954
  def CheckPrereq(self):
955
    """Check prerequisites.
956

957
    This has no prerequisites.
958

959
    """
960
    pass
961

    
962
  def Exec(self, feedback_fn):
963
    """Verify integrity of cluster disks.
964

965
    """
966
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
967

    
968
    vg_name = self.cfg.GetVGName()
969
    nodes = utils.NiceSort(self.cfg.GetNodeList())
970
    instances = [self.cfg.GetInstanceInfo(name)
971
                 for name in self.cfg.GetInstanceList()]
972

    
973
    nv_dict = {}
974
    for inst in instances:
975
      inst_lvs = {}
976
      if (inst.status != "up" or
977
          inst.disk_template not in constants.DTS_NET_MIRROR):
978
        continue
979
      inst.MapLVsByNode(inst_lvs)
980
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
981
      for node, vol_list in inst_lvs.iteritems():
982
        for vol in vol_list:
983
          nv_dict[(node, vol)] = inst
984

    
985
    if not nv_dict:
986
      return result
987

    
988
    node_lvs = rpc.call_volume_list(nodes, vg_name)
989

    
990
    to_act = set()
991
    for node in nodes:
992
      # node_volume
993
      lvs = node_lvs[node]
994

    
995
      if isinstance(lvs, basestring):
996
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
997
        res_nlvm[node] = lvs
998
      elif not isinstance(lvs, dict):
999
        logger.Info("connection to node %s failed or invalid data returned" %
1000
                    (node,))
1001
        res_nodes.append(node)
1002
        continue
1003

    
1004
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1005
        inst = nv_dict.pop((node, lv_name), None)
1006
        if (not lv_online and inst is not None
1007
            and inst.name not in res_instances):
1008
          res_instances.append(inst.name)
1009

    
1010
    # any leftover items in nv_dict are missing LVs, let's arrange the
1011
    # data better
1012
    for key, inst in nv_dict.iteritems():
1013
      if inst.name not in res_missing:
1014
        res_missing[inst.name] = []
1015
      res_missing[inst.name].append(key)
1016

    
1017
    return result
1018

    
1019

    
1020
class LURenameCluster(LogicalUnit):
1021
  """Rename the cluster.
1022

1023
  """
1024
  HPATH = "cluster-rename"
1025
  HTYPE = constants.HTYPE_CLUSTER
1026
  _OP_REQP = ["name"]
1027

    
1028
  def BuildHooksEnv(self):
1029
    """Build hooks env.
1030

1031
    """
1032
    env = {
1033
      "OP_TARGET": self.sstore.GetClusterName(),
1034
      "NEW_NAME": self.op.name,
1035
      }
1036
    mn = self.sstore.GetMasterNode()
1037
    return env, [mn], [mn]
1038

    
1039
  def CheckPrereq(self):
1040
    """Verify that the passed name is a valid one.
1041

1042
    """
1043
    hostname = utils.HostInfo(self.op.name)
1044

    
1045
    new_name = hostname.name
1046
    self.ip = new_ip = hostname.ip
1047
    old_name = self.sstore.GetClusterName()
1048
    old_ip = self.sstore.GetMasterIP()
1049
    if new_name == old_name and new_ip == old_ip:
1050
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1051
                                 " cluster has changed")
1052
    if new_ip != old_ip:
1053
      result = utils.RunCmd(["fping", "-q", new_ip])
1054
      if not result.failed:
1055
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056
                                   " reachable on the network. Aborting." %
1057
                                   new_ip)
1058

    
1059
    self.op.name = new_name
1060

    
1061
  def Exec(self, feedback_fn):
1062
    """Rename the cluster.
1063

1064
    """
1065
    clustername = self.op.name
1066
    ip = self.ip
1067
    ss = self.sstore
1068

    
1069
    # shutdown the master IP
1070
    master = ss.GetMasterNode()
1071
    if not rpc.call_node_stop_master(master):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      ss.SetKey(ss.SS_MASTER_IP, ip)
1077
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078

    
1079
      # Distribute updated ss config to all nodes
1080
      myself = self.cfg.GetNodeInfo(master)
1081
      dist_nodes = self.cfg.GetNodeList()
1082
      if myself.name in dist_nodes:
1083
        dist_nodes.remove(myself.name)
1084

    
1085
      logger.Debug("Copying updated ssconf data to all nodes")
1086
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087
        fname = ss.KeyToFilename(keyname)
1088
        result = rpc.call_upload_file(dist_nodes, fname)
1089
        for to_node in dist_nodes:
1090
          if not result[to_node]:
1091
            logger.Error("copy of file %s to node %s failed" %
1092
                         (fname, to_node))
1093
    finally:
1094
      if not rpc.call_node_start_master(master):
1095
        logger.Error("Could not re-enable the master role on the master,"
1096
                     " please restart manually.")
1097

    
1098

    
1099
def _RecursiveCheckIfLVMBased(disk):
1100
  """Check if the given disk or its children are lvm-based.
1101

1102
  Args:
1103
    disk: ganeti.objects.Disk object
1104

1105
  Returns:
1106
    boolean indicating whether a LD_LV dev_type was found or not
1107

1108
  """
1109
  if disk.children:
1110
    for chdisk in disk.children:
1111
      if _RecursiveCheckIfLVMBased(chdisk):
1112
        return True
1113
  return disk.dev_type == constants.LD_LV
1114

    
1115

    
1116
class LUSetClusterParams(LogicalUnit):
1117
  """Change the parameters of the cluster.
1118

1119
  """
1120
  HPATH = "cluster-modify"
1121
  HTYPE = constants.HTYPE_CLUSTER
1122
  _OP_REQP = []
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    """
1128
    env = {
1129
      "OP_TARGET": self.sstore.GetClusterName(),
1130
      "NEW_VG_NAME": self.op.vg_name,
1131
      }
1132
    mn = self.sstore.GetMasterNode()
1133
    return env, [mn], [mn]
1134

    
1135
  def CheckPrereq(self):
1136
    """Check prerequisites.
1137

1138
    This checks whether the given params don't conflict and
1139
    if the given volume group is valid.
1140

1141
    """
1142
    if not self.op.vg_name:
1143
      instances = [self.cfg.GetInstanceInfo(name)
1144
                   for name in self.cfg.GetInstanceList()]
1145
      for inst in instances:
1146
        for disk in inst.disks:
1147
          if _RecursiveCheckIfLVMBased(disk):
1148
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1149
                                       " lvm-based instances exist")
1150

    
1151
    # if vg_name not None, checks given volume group on all nodes
1152
    if self.op.vg_name:
1153
      node_list = self.cfg.GetNodeList()
1154
      vglist = rpc.call_vg_list(node_list)
1155
      for node in node_list:
1156
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1157
        if vgstatus:
1158
          raise errors.OpPrereqError("Error on node '%s': %s" %
1159
                                     (node, vgstatus))
1160

    
1161
  def Exec(self, feedback_fn):
1162
    """Change the parameters of the cluster.
1163

1164
    """
1165
    if self.op.vg_name != self.cfg.GetVGName():
1166
      self.cfg.SetVGName(self.op.vg_name)
1167
    else:
1168
      feedback_fn("Cluster LVM configuration already in desired"
1169
                  " state, not changing")
1170

    
1171

    
1172
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1173
  """Sleep and poll for an instance's disk to sync.
1174

1175
  """
1176
  if not instance.disks:
1177
    return True
1178

    
1179
  if not oneshot:
1180
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181

    
1182
  node = instance.primary_node
1183

    
1184
  for dev in instance.disks:
1185
    cfgw.SetDiskID(dev, node)
1186

    
1187
  retries = 0
1188
  while True:
1189
    max_time = 0
1190
    done = True
1191
    cumul_degraded = False
1192
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193
    if not rstats:
1194
      proc.LogWarning("Can't get any data from node %s" % node)
1195
      retries += 1
1196
      if retries >= 10:
1197
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1198
                                 " aborting." % node)
1199
      time.sleep(6)
1200
      continue
1201
    retries = 0
1202
    for i in range(len(rstats)):
1203
      mstat = rstats[i]
1204
      if mstat is None:
1205
        proc.LogWarning("Can't compute data for node %s/%s" %
1206
                        (node, instance.disks[i].iv_name))
1207
        continue
1208
      # we ignore the ldisk parameter
1209
      perc_done, est_time, is_degraded, _ = mstat
1210
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211
      if perc_done is not None:
1212
        done = False
1213
        if est_time is not None:
1214
          rem_time = "%d estimated seconds remaining" % est_time
1215
          max_time = est_time
1216
        else:
1217
          rem_time = "no time estimate"
1218
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219
                     (instance.disks[i].iv_name, perc_done, rem_time))
1220
    if done or oneshot:
1221
      break
1222

    
1223
    if unlock:
1224
      #utils.Unlock('cmd')
1225
      pass
1226
    try:
1227
      time.sleep(min(60, max_time))
1228
    finally:
1229
      if unlock:
1230
        #utils.Lock('cmd')
1231
        pass
1232

    
1233
  if done:
1234
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1235
  return not cumul_degraded
1236

    
1237

    
1238
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1239
  """Check that mirrors are not degraded.
1240

1241
  The ldisk parameter, if True, will change the test from the
1242
  is_degraded attribute (which represents overall non-ok status for
1243
  the device(s)) to the ldisk (representing the local storage status).
1244

1245
  """
1246
  cfgw.SetDiskID(dev, node)
1247
  if ldisk:
1248
    idx = 6
1249
  else:
1250
    idx = 5
1251

    
1252
  result = True
1253
  if on_primary or dev.AssembleOnSecondary():
1254
    rstats = rpc.call_blockdev_find(node, dev)
1255
    if not rstats:
1256
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1257
      result = False
1258
    else:
1259
      result = result and (not rstats[idx])
1260
  if dev.children:
1261
    for child in dev.children:
1262
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1263

    
1264
  return result
1265

    
1266

    
1267
class LUDiagnoseOS(NoHooksLU):
1268
  """Logical unit for OS diagnose/query.
1269

1270
  """
1271
  _OP_REQP = ["output_fields", "names"]
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    This always succeeds, since this is a pure query LU.
1277

1278
    """
1279
    if self.op.names:
1280
      raise errors.OpPrereqError("Selective OS query not supported")
1281

    
1282
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1283
    _CheckOutputFields(static=[],
1284
                       dynamic=self.dynamic_fields,
1285
                       selected=self.op.output_fields)
1286

    
1287
  @staticmethod
1288
  def _DiagnoseByOS(node_list, rlist):
1289
    """Remaps a per-node return list into an a per-os per-node dictionary
1290

1291
      Args:
1292
        node_list: a list with the names of all nodes
1293
        rlist: a map with node names as keys and OS objects as values
1294

1295
      Returns:
1296
        map: a map with osnames as keys and as value another map, with
1297
             nodes as
1298
             keys and list of OS objects as values
1299
             e.g. {"debian-etch": {"node1": [<object>,...],
1300
                                   "node2": [<object>,]}
1301
                  }
1302

1303
    """
1304
    all_os = {}
1305
    for node_name, nr in rlist.iteritems():
1306
      if not nr:
1307
        continue
1308
      for os_obj in nr:
1309
        if os_obj.name not in all_os:
1310
          # build a list of nodes for this os containing empty lists
1311
          # for each node in node_list
1312
          all_os[os_obj.name] = {}
1313
          for nname in node_list:
1314
            all_os[os_obj.name][nname] = []
1315
        all_os[os_obj.name][node_name].append(os_obj)
1316
    return all_os
1317

    
1318
  def Exec(self, feedback_fn):
1319
    """Compute the list of OSes.
1320

1321
    """
1322
    node_list = self.cfg.GetNodeList()
1323
    node_data = rpc.call_os_diagnose(node_list)
1324
    if node_data == False:
1325
      raise errors.OpExecError("Can't gather the list of OSes")
1326
    pol = self._DiagnoseByOS(node_list, node_data)
1327
    output = []
1328
    for os_name, os_data in pol.iteritems():
1329
      row = []
1330
      for field in self.op.output_fields:
1331
        if field == "name":
1332
          val = os_name
1333
        elif field == "valid":
1334
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1335
        elif field == "node_status":
1336
          val = {}
1337
          for node_name, nos_list in os_data.iteritems():
1338
            val[node_name] = [(v.status, v.path) for v in nos_list]
1339
        else:
1340
          raise errors.ParameterError(field)
1341
        row.append(val)
1342
      output.append(row)
1343

    
1344
    return output
1345

    
1346

    
1347
class LURemoveNode(LogicalUnit):
1348
  """Logical unit for removing a node.
1349

1350
  """
1351
  HPATH = "node-remove"
1352
  HTYPE = constants.HTYPE_NODE
1353
  _OP_REQP = ["node_name"]
1354

    
1355
  def BuildHooksEnv(self):
1356
    """Build hooks env.
1357

1358
    This doesn't run on the target node in the pre phase as a failed
1359
    node would not allows itself to run.
1360

1361
    """
1362
    env = {
1363
      "OP_TARGET": self.op.node_name,
1364
      "NODE_NAME": self.op.node_name,
1365
      }
1366
    all_nodes = self.cfg.GetNodeList()
1367
    all_nodes.remove(self.op.node_name)
1368
    return env, all_nodes, all_nodes
1369

    
1370
  def CheckPrereq(self):
1371
    """Check prerequisites.
1372

1373
    This checks:
1374
     - the node exists in the configuration
1375
     - it does not have primary or secondary instances
1376
     - it's not the master
1377

1378
    Any errors are signalled by raising errors.OpPrereqError.
1379

1380
    """
1381
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1382
    if node is None:
1383
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1384

    
1385
    instance_list = self.cfg.GetInstanceList()
1386

    
1387
    masternode = self.sstore.GetMasterNode()
1388
    if node.name == masternode:
1389
      raise errors.OpPrereqError("Node is the master node,"
1390
                                 " you need to failover first.")
1391

    
1392
    for instance_name in instance_list:
1393
      instance = self.cfg.GetInstanceInfo(instance_name)
1394
      if node.name == instance.primary_node:
1395
        raise errors.OpPrereqError("Instance %s still running on the node,"
1396
                                   " please remove first." % instance_name)
1397
      if node.name in instance.secondary_nodes:
1398
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1399
                                   " please remove first." % instance_name)
1400
    self.op.node_name = node.name
1401
    self.node = node
1402

    
1403
  def Exec(self, feedback_fn):
1404
    """Removes the node from the cluster.
1405

1406
    """
1407
    node = self.node
1408
    logger.Info("stopping the node daemon and removing configs from node %s" %
1409
                node.name)
1410

    
1411
    rpc.call_node_leave_cluster(node.name)
1412

    
1413
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1414

    
1415
    logger.Info("Removing node %s from config" % node.name)
1416

    
1417
    self.cfg.RemoveNode(node.name)
1418

    
1419
    _RemoveHostFromEtcHosts(node.name)
1420

    
1421

    
1422
class LUQueryNodes(NoHooksLU):
1423
  """Logical unit for querying nodes.
1424

1425
  """
1426
  _OP_REQP = ["output_fields", "names"]
1427

    
1428
  def CheckPrereq(self):
1429
    """Check prerequisites.
1430

1431
    This checks that the fields required are valid output fields.
1432

1433
    """
1434
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1435
                                     "mtotal", "mnode", "mfree",
1436
                                     "bootid"])
1437

    
1438
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1439
                               "pinst_list", "sinst_list",
1440
                               "pip", "sip"],
1441
                       dynamic=self.dynamic_fields,
1442
                       selected=self.op.output_fields)
1443

    
1444
    self.wanted = _GetWantedNodes(self, self.op.names)
1445

    
1446
  def Exec(self, feedback_fn):
1447
    """Computes the list of nodes and their attributes.
1448

1449
    """
1450
    nodenames = self.wanted
1451
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1452

    
1453
    # begin data gathering
1454

    
1455
    if self.dynamic_fields.intersection(self.op.output_fields):
1456
      live_data = {}
1457
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1458
      for name in nodenames:
1459
        nodeinfo = node_data.get(name, None)
1460
        if nodeinfo:
1461
          live_data[name] = {
1462
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1463
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1464
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1465
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1466
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1467
            "bootid": nodeinfo['bootid'],
1468
            }
1469
        else:
1470
          live_data[name] = {}
1471
    else:
1472
      live_data = dict.fromkeys(nodenames, {})
1473

    
1474
    node_to_primary = dict([(name, set()) for name in nodenames])
1475
    node_to_secondary = dict([(name, set()) for name in nodenames])
1476

    
1477
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1478
                             "sinst_cnt", "sinst_list"))
1479
    if inst_fields & frozenset(self.op.output_fields):
1480
      instancelist = self.cfg.GetInstanceList()
1481

    
1482
      for instance_name in instancelist:
1483
        inst = self.cfg.GetInstanceInfo(instance_name)
1484
        if inst.primary_node in node_to_primary:
1485
          node_to_primary[inst.primary_node].add(inst.name)
1486
        for secnode in inst.secondary_nodes:
1487
          if secnode in node_to_secondary:
1488
            node_to_secondary[secnode].add(inst.name)
1489

    
1490
    # end data gathering
1491

    
1492
    output = []
1493
    for node in nodelist:
1494
      node_output = []
1495
      for field in self.op.output_fields:
1496
        if field == "name":
1497
          val = node.name
1498
        elif field == "pinst_list":
1499
          val = list(node_to_primary[node.name])
1500
        elif field == "sinst_list":
1501
          val = list(node_to_secondary[node.name])
1502
        elif field == "pinst_cnt":
1503
          val = len(node_to_primary[node.name])
1504
        elif field == "sinst_cnt":
1505
          val = len(node_to_secondary[node.name])
1506
        elif field == "pip":
1507
          val = node.primary_ip
1508
        elif field == "sip":
1509
          val = node.secondary_ip
1510
        elif field in self.dynamic_fields:
1511
          val = live_data[node.name].get(field, None)
1512
        else:
1513
          raise errors.ParameterError(field)
1514
        node_output.append(val)
1515
      output.append(node_output)
1516

    
1517
    return output
1518

    
1519

    
1520
class LUQueryNodeVolumes(NoHooksLU):
1521
  """Logical unit for getting volumes on node(s).
1522

1523
  """
1524
  _OP_REQP = ["nodes", "output_fields"]
1525

    
1526
  def CheckPrereq(self):
1527
    """Check prerequisites.
1528

1529
    This checks that the fields required are valid output fields.
1530

1531
    """
1532
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1533

    
1534
    _CheckOutputFields(static=["node"],
1535
                       dynamic=["phys", "vg", "name", "size", "instance"],
1536
                       selected=self.op.output_fields)
1537

    
1538

    
1539
  def Exec(self, feedback_fn):
1540
    """Computes the list of nodes and their attributes.
1541

1542
    """
1543
    nodenames = self.nodes
1544
    volumes = rpc.call_node_volumes(nodenames)
1545

    
1546
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1547
             in self.cfg.GetInstanceList()]
1548

    
1549
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1550

    
1551
    output = []
1552
    for node in nodenames:
1553
      if node not in volumes or not volumes[node]:
1554
        continue
1555

    
1556
      node_vols = volumes[node][:]
1557
      node_vols.sort(key=lambda vol: vol['dev'])
1558

    
1559
      for vol in node_vols:
1560
        node_output = []
1561
        for field in self.op.output_fields:
1562
          if field == "node":
1563
            val = node
1564
          elif field == "phys":
1565
            val = vol['dev']
1566
          elif field == "vg":
1567
            val = vol['vg']
1568
          elif field == "name":
1569
            val = vol['name']
1570
          elif field == "size":
1571
            val = int(float(vol['size']))
1572
          elif field == "instance":
1573
            for inst in ilist:
1574
              if node not in lv_by_node[inst]:
1575
                continue
1576
              if vol['name'] in lv_by_node[inst][node]:
1577
                val = inst.name
1578
                break
1579
            else:
1580
              val = '-'
1581
          else:
1582
            raise errors.ParameterError(field)
1583
          node_output.append(str(val))
1584

    
1585
        output.append(node_output)
1586

    
1587
    return output
1588

    
1589

    
1590
class LUAddNode(LogicalUnit):
1591
  """Logical unit for adding node to the cluster.
1592

1593
  """
1594
  HPATH = "node-add"
1595
  HTYPE = constants.HTYPE_NODE
1596
  _OP_REQP = ["node_name"]
1597

    
1598
  def BuildHooksEnv(self):
1599
    """Build hooks env.
1600

1601
    This will run on all nodes before, and on all nodes + the new node after.
1602

1603
    """
1604
    env = {
1605
      "OP_TARGET": self.op.node_name,
1606
      "NODE_NAME": self.op.node_name,
1607
      "NODE_PIP": self.op.primary_ip,
1608
      "NODE_SIP": self.op.secondary_ip,
1609
      }
1610
    nodes_0 = self.cfg.GetNodeList()
1611
    nodes_1 = nodes_0 + [self.op.node_name, ]
1612
    return env, nodes_0, nodes_1
1613

    
1614
  def CheckPrereq(self):
1615
    """Check prerequisites.
1616

1617
    This checks:
1618
     - the new node is not already in the config
1619
     - it is resolvable
1620
     - its parameters (single/dual homed) matches the cluster
1621

1622
    Any errors are signalled by raising errors.OpPrereqError.
1623

1624
    """
1625
    node_name = self.op.node_name
1626
    cfg = self.cfg
1627

    
1628
    dns_data = utils.HostInfo(node_name)
1629

    
1630
    node = dns_data.name
1631
    primary_ip = self.op.primary_ip = dns_data.ip
1632
    secondary_ip = getattr(self.op, "secondary_ip", None)
1633
    if secondary_ip is None:
1634
      secondary_ip = primary_ip
1635
    if not utils.IsValidIP(secondary_ip):
1636
      raise errors.OpPrereqError("Invalid secondary IP given")
1637
    self.op.secondary_ip = secondary_ip
1638

    
1639
    node_list = cfg.GetNodeList()
1640
    if not self.op.readd and node in node_list:
1641
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1642
                                 node)
1643
    elif self.op.readd and node not in node_list:
1644
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1645

    
1646
    for existing_node_name in node_list:
1647
      existing_node = cfg.GetNodeInfo(existing_node_name)
1648

    
1649
      if self.op.readd and node == existing_node_name:
1650
        if (existing_node.primary_ip != primary_ip or
1651
            existing_node.secondary_ip != secondary_ip):
1652
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1653
                                     " address configuration as before")
1654
        continue
1655

    
1656
      if (existing_node.primary_ip == primary_ip or
1657
          existing_node.secondary_ip == primary_ip or
1658
          existing_node.primary_ip == secondary_ip or
1659
          existing_node.secondary_ip == secondary_ip):
1660
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1661
                                   " existing node %s" % existing_node.name)
1662

    
1663
    # check that the type of the node (single versus dual homed) is the
1664
    # same as for the master
1665
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1666
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1667
    newbie_singlehomed = secondary_ip == primary_ip
1668
    if master_singlehomed != newbie_singlehomed:
1669
      if master_singlehomed:
1670
        raise errors.OpPrereqError("The master has no private ip but the"
1671
                                   " new node has one")
1672
      else:
1673
        raise errors.OpPrereqError("The master has a private ip but the"
1674
                                   " new node doesn't have one")
1675

    
1676
    # checks reachablity
1677
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1678
      raise errors.OpPrereqError("Node not reachable by ping")
1679

    
1680
    if not newbie_singlehomed:
1681
      # check reachability from my secondary ip to newbie's secondary ip
1682
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1683
                           source=myself.secondary_ip):
1684
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1685
                                   " based ping to noded port")
1686

    
1687
    self.new_node = objects.Node(name=node,
1688
                                 primary_ip=primary_ip,
1689
                                 secondary_ip=secondary_ip)
1690

    
1691
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1692
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1693
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1694
                                   constants.VNC_PASSWORD_FILE)
1695

    
1696
  def Exec(self, feedback_fn):
1697
    """Adds the new node to the cluster.
1698

1699
    """
1700
    new_node = self.new_node
1701
    node = new_node.name
1702

    
1703
    # set up inter-node password and certificate and restarts the node daemon
1704
    gntpass = self.sstore.GetNodeDaemonPassword()
1705
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1706
      raise errors.OpExecError("ganeti password corruption detected")
1707
    f = open(constants.SSL_CERT_FILE)
1708
    try:
1709
      gntpem = f.read(8192)
1710
    finally:
1711
      f.close()
1712
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1713
    # so we use this to detect an invalid certificate; as long as the
1714
    # cert doesn't contain this, the here-document will be correctly
1715
    # parsed by the shell sequence below
1716
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1717
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1718
    if not gntpem.endswith("\n"):
1719
      raise errors.OpExecError("PEM must end with newline")
1720
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1721

    
1722
    # and then connect with ssh to set password and start ganeti-noded
1723
    # note that all the below variables are sanitized at this point,
1724
    # either by being constants or by the checks above
1725
    ss = self.sstore
1726
    mycommand = ("umask 077 && "
1727
                 "echo '%s' > '%s' && "
1728
                 "cat > '%s' << '!EOF.' && \n"
1729
                 "%s!EOF.\n%s restart" %
1730
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1731
                  constants.SSL_CERT_FILE, gntpem,
1732
                  constants.NODE_INITD_SCRIPT))
1733

    
1734
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1735
    if result.failed:
1736
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1737
                               " output: %s" %
1738
                               (node, result.fail_reason, result.output))
1739

    
1740
    # check connectivity
1741
    time.sleep(4)
1742

    
1743
    result = rpc.call_version([node])[node]
1744
    if result:
1745
      if constants.PROTOCOL_VERSION == result:
1746
        logger.Info("communication to node %s fine, sw version %s match" %
1747
                    (node, result))
1748
      else:
1749
        raise errors.OpExecError("Version mismatch master version %s,"
1750
                                 " node version %s" %
1751
                                 (constants.PROTOCOL_VERSION, result))
1752
    else:
1753
      raise errors.OpExecError("Cannot get version from the new node")
1754

    
1755
    # setup ssh on node
1756
    logger.Info("copy ssh key to node %s" % node)
1757
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1758
    keyarray = []
1759
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1760
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1761
                priv_key, pub_key]
1762

    
1763
    for i in keyfiles:
1764
      f = open(i, 'r')
1765
      try:
1766
        keyarray.append(f.read())
1767
      finally:
1768
        f.close()
1769

    
1770
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1771
                               keyarray[3], keyarray[4], keyarray[5])
1772

    
1773
    if not result:
1774
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1775

    
1776
    # Add node to our /etc/hosts, and add key to known_hosts
1777
    _AddHostToEtcHosts(new_node.name)
1778

    
1779
    if new_node.secondary_ip != new_node.primary_ip:
1780
      if not rpc.call_node_tcp_ping(new_node.name,
1781
                                    constants.LOCALHOST_IP_ADDRESS,
1782
                                    new_node.secondary_ip,
1783
                                    constants.DEFAULT_NODED_PORT,
1784
                                    10, False):
1785
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1786
                                 " you gave (%s). Please fix and re-run this"
1787
                                 " command." % new_node.secondary_ip)
1788

    
1789
    success, msg = self.ssh.VerifyNodeHostname(node)
1790
    if not success:
1791
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1792
                               " than the one the resolver gives: %s."
1793
                               " Please fix and re-run this command." %
1794
                               (node, msg))
1795

    
1796
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1797
    # including the node just added
1798
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1799
    dist_nodes = self.cfg.GetNodeList() + [node]
1800
    if myself.name in dist_nodes:
1801
      dist_nodes.remove(myself.name)
1802

    
1803
    logger.Debug("Copying hosts and known_hosts to all nodes")
1804
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1805
      result = rpc.call_upload_file(dist_nodes, fname)
1806
      for to_node in dist_nodes:
1807
        if not result[to_node]:
1808
          logger.Error("copy of file %s to node %s failed" %
1809
                       (fname, to_node))
1810

    
1811
    to_copy = ss.GetFileList()
1812
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1813
      to_copy.append(constants.VNC_PASSWORD_FILE)
1814
    for fname in to_copy:
1815
      if not self.ssh.CopyFileToNode(node, fname):
1816
        logger.Error("could not copy file %s to node %s" % (fname, node))
1817

    
1818
    if not self.op.readd:
1819
      logger.Info("adding node %s to cluster.conf" % node)
1820
      self.cfg.AddNode(new_node)
1821

    
1822

    
1823
class LUMasterFailover(LogicalUnit):
1824
  """Failover the master node to the current node.
1825

1826
  This is a special LU in that it must run on a non-master node.
1827

1828
  """
1829
  HPATH = "master-failover"
1830
  HTYPE = constants.HTYPE_CLUSTER
1831
  REQ_MASTER = False
1832
  _OP_REQP = []
1833

    
1834
  def BuildHooksEnv(self):
1835
    """Build hooks env.
1836

1837
    This will run on the new master only in the pre phase, and on all
1838
    the nodes in the post phase.
1839

1840
    """
1841
    env = {
1842
      "OP_TARGET": self.new_master,
1843
      "NEW_MASTER": self.new_master,
1844
      "OLD_MASTER": self.old_master,
1845
      }
1846
    return env, [self.new_master], self.cfg.GetNodeList()
1847

    
1848
  def CheckPrereq(self):
1849
    """Check prerequisites.
1850

1851
    This checks that we are not already the master.
1852

1853
    """
1854
    self.new_master = utils.HostInfo().name
1855
    self.old_master = self.sstore.GetMasterNode()
1856

    
1857
    if self.old_master == self.new_master:
1858
      raise errors.OpPrereqError("This commands must be run on the node"
1859
                                 " where you want the new master to be."
1860
                                 " %s is already the master" %
1861
                                 self.old_master)
1862

    
1863
  def Exec(self, feedback_fn):
1864
    """Failover the master node.
1865

1866
    This command, when run on a non-master node, will cause the current
1867
    master to cease being master, and the non-master to become new
1868
    master.
1869

1870
    """
1871
    #TODO: do not rely on gethostname returning the FQDN
1872
    logger.Info("setting master to %s, old master: %s" %
1873
                (self.new_master, self.old_master))
1874

    
1875
    if not rpc.call_node_stop_master(self.old_master):
1876
      logger.Error("could disable the master role on the old master"
1877
                   " %s, please disable manually" % self.old_master)
1878

    
1879
    ss = self.sstore
1880
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1881
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1882
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1883
      logger.Error("could not distribute the new simple store master file"
1884
                   " to the other nodes, please check.")
1885

    
1886
    if not rpc.call_node_start_master(self.new_master):
1887
      logger.Error("could not start the master role on the new master"
1888
                   " %s, please check" % self.new_master)
1889
      feedback_fn("Error in activating the master IP on the new master,"
1890
                  " please fix manually.")
1891

    
1892

    
1893

    
1894
class LUQueryClusterInfo(NoHooksLU):
1895
  """Query cluster configuration.
1896

1897
  """
1898
  _OP_REQP = []
1899
  REQ_MASTER = False
1900

    
1901
  def CheckPrereq(self):
1902
    """No prerequsites needed for this LU.
1903

1904
    """
1905
    pass
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Return cluster config.
1909

1910
    """
1911
    result = {
1912
      "name": self.sstore.GetClusterName(),
1913
      "software_version": constants.RELEASE_VERSION,
1914
      "protocol_version": constants.PROTOCOL_VERSION,
1915
      "config_version": constants.CONFIG_VERSION,
1916
      "os_api_version": constants.OS_API_VERSION,
1917
      "export_version": constants.EXPORT_VERSION,
1918
      "master": self.sstore.GetMasterNode(),
1919
      "architecture": (platform.architecture()[0], platform.machine()),
1920
      }
1921

    
1922
    return result
1923

    
1924

    
1925
class LUClusterCopyFile(NoHooksLU):
1926
  """Copy file to cluster.
1927

1928
  """
1929
  _OP_REQP = ["nodes", "filename"]
1930

    
1931
  def CheckPrereq(self):
1932
    """Check prerequisites.
1933

1934
    It should check that the named file exists and that the given list
1935
    of nodes is valid.
1936

1937
    """
1938
    if not os.path.exists(self.op.filename):
1939
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1940

    
1941
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1942

    
1943
  def Exec(self, feedback_fn):
1944
    """Copy a file from master to some nodes.
1945

1946
    Args:
1947
      opts - class with options as members
1948
      args - list containing a single element, the file name
1949
    Opts used:
1950
      nodes - list containing the name of target nodes; if empty, all nodes
1951

1952
    """
1953
    filename = self.op.filename
1954

    
1955
    myname = utils.HostInfo().name
1956

    
1957
    for node in self.nodes:
1958
      if node == myname:
1959
        continue
1960
      if not self.ssh.CopyFileToNode(node, filename):
1961
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1962

    
1963

    
1964
class LUDumpClusterConfig(NoHooksLU):
1965
  """Return a text-representation of the cluster-config.
1966

1967
  """
1968
  _OP_REQP = []
1969

    
1970
  def CheckPrereq(self):
1971
    """No prerequisites.
1972

1973
    """
1974
    pass
1975

    
1976
  def Exec(self, feedback_fn):
1977
    """Dump a representation of the cluster config to the standard output.
1978

1979
    """
1980
    return self.cfg.DumpConfig()
1981

    
1982

    
1983
class LURunClusterCommand(NoHooksLU):
1984
  """Run a command on some nodes.
1985

1986
  """
1987
  _OP_REQP = ["command", "nodes"]
1988

    
1989
  def CheckPrereq(self):
1990
    """Check prerequisites.
1991

1992
    It checks that the given list of nodes is valid.
1993

1994
    """
1995
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1996

    
1997
  def Exec(self, feedback_fn):
1998
    """Run a command on some nodes.
1999

2000
    """
2001
    # put the master at the end of the nodes list
2002
    master_node = self.sstore.GetMasterNode()
2003
    if master_node in self.nodes:
2004
      self.nodes.remove(master_node)
2005
      self.nodes.append(master_node)
2006

    
2007
    data = []
2008
    for node in self.nodes:
2009
      result = self.ssh.Run(node, "root", self.op.command)
2010
      data.append((node, result.output, result.exit_code))
2011

    
2012
    return data
2013

    
2014

    
2015
class LUActivateInstanceDisks(NoHooksLU):
2016
  """Bring up an instance's disks.
2017

2018
  """
2019
  _OP_REQP = ["instance_name"]
2020

    
2021
  def CheckPrereq(self):
2022
    """Check prerequisites.
2023

2024
    This checks that the instance is in the cluster.
2025

2026
    """
2027
    instance = self.cfg.GetInstanceInfo(
2028
      self.cfg.ExpandInstanceName(self.op.instance_name))
2029
    if instance is None:
2030
      raise errors.OpPrereqError("Instance '%s' not known" %
2031
                                 self.op.instance_name)
2032
    self.instance = instance
2033

    
2034

    
2035
  def Exec(self, feedback_fn):
2036
    """Activate the disks.
2037

2038
    """
2039
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2040
    if not disks_ok:
2041
      raise errors.OpExecError("Cannot activate block devices")
2042

    
2043
    return disks_info
2044

    
2045

    
2046
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2047
  """Prepare the block devices for an instance.
2048

2049
  This sets up the block devices on all nodes.
2050

2051
  Args:
2052
    instance: a ganeti.objects.Instance object
2053
    ignore_secondaries: if true, errors on secondary nodes won't result
2054
                        in an error return from the function
2055

2056
  Returns:
2057
    false if the operation failed
2058
    list of (host, instance_visible_name, node_visible_name) if the operation
2059
         suceeded with the mapping from node devices to instance devices
2060
  """
2061
  device_info = []
2062
  disks_ok = True
2063
  iname = instance.name
2064
  # With the two passes mechanism we try to reduce the window of
2065
  # opportunity for the race condition of switching DRBD to primary
2066
  # before handshaking occured, but we do not eliminate it
2067

    
2068
  # The proper fix would be to wait (with some limits) until the
2069
  # connection has been made and drbd transitions from WFConnection
2070
  # into any other network-connected state (Connected, SyncTarget,
2071
  # SyncSource, etc.)
2072

    
2073
  # 1st pass, assemble on all nodes in secondary mode
2074
  for inst_disk in instance.disks:
2075
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2076
      cfg.SetDiskID(node_disk, node)
2077
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2078
      if not result:
2079
        logger.Error("could not prepare block device %s on node %s"
2080
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2081
        if not ignore_secondaries:
2082
          disks_ok = False
2083

    
2084
  # FIXME: race condition on drbd migration to primary
2085

    
2086
  # 2nd pass, do only the primary node
2087
  for inst_disk in instance.disks:
2088
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2089
      if node != instance.primary_node:
2090
        continue
2091
      cfg.SetDiskID(node_disk, node)
2092
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2093
      if not result:
2094
        logger.Error("could not prepare block device %s on node %s"
2095
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2096
        disks_ok = False
2097
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2098

    
2099
  # leave the disks configured for the primary node
2100
  # this is a workaround that would be fixed better by
2101
  # improving the logical/physical id handling
2102
  for disk in instance.disks:
2103
    cfg.SetDiskID(disk, instance.primary_node)
2104

    
2105
  return disks_ok, device_info
2106

    
2107

    
2108
def _StartInstanceDisks(cfg, instance, force):
2109
  """Start the disks of an instance.
2110

2111
  """
2112
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2113
                                           ignore_secondaries=force)
2114
  if not disks_ok:
2115
    _ShutdownInstanceDisks(instance, cfg)
2116
    if force is not None and not force:
2117
      logger.Error("If the message above refers to a secondary node,"
2118
                   " you can retry the operation using '--force'.")
2119
    raise errors.OpExecError("Disk consistency error")
2120

    
2121

    
2122
class LUDeactivateInstanceDisks(NoHooksLU):
2123
  """Shutdown an instance's disks.
2124

2125
  """
2126
  _OP_REQP = ["instance_name"]
2127

    
2128
  def CheckPrereq(self):
2129
    """Check prerequisites.
2130

2131
    This checks that the instance is in the cluster.
2132

2133
    """
2134
    instance = self.cfg.GetInstanceInfo(
2135
      self.cfg.ExpandInstanceName(self.op.instance_name))
2136
    if instance is None:
2137
      raise errors.OpPrereqError("Instance '%s' not known" %
2138
                                 self.op.instance_name)
2139
    self.instance = instance
2140

    
2141
  def Exec(self, feedback_fn):
2142
    """Deactivate the disks
2143

2144
    """
2145
    instance = self.instance
2146
    ins_l = rpc.call_instance_list([instance.primary_node])
2147
    ins_l = ins_l[instance.primary_node]
2148
    if not type(ins_l) is list:
2149
      raise errors.OpExecError("Can't contact node '%s'" %
2150
                               instance.primary_node)
2151

    
2152
    if self.instance.name in ins_l:
2153
      raise errors.OpExecError("Instance is running, can't shutdown"
2154
                               " block devices.")
2155

    
2156
    _ShutdownInstanceDisks(instance, self.cfg)
2157

    
2158

    
2159
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2160
  """Shutdown block devices of an instance.
2161

2162
  This does the shutdown on all nodes of the instance.
2163

2164
  If the ignore_primary is false, errors on the primary node are
2165
  ignored.
2166

2167
  """
2168
  result = True
2169
  for disk in instance.disks:
2170
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2171
      cfg.SetDiskID(top_disk, node)
2172
      if not rpc.call_blockdev_shutdown(node, top_disk):
2173
        logger.Error("could not shutdown block device %s on node %s" %
2174
                     (disk.iv_name, node))
2175
        if not ignore_primary or node != instance.primary_node:
2176
          result = False
2177
  return result
2178

    
2179

    
2180
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2181
  """Checks if a node has enough free memory.
2182

2183
  This function check if a given node has the needed amount of free
2184
  memory. In case the node has less memory or we cannot get the
2185
  information from the node, this function raise an OpPrereqError
2186
  exception.
2187

2188
  Args:
2189
    - cfg: a ConfigWriter instance
2190
    - node: the node name
2191
    - reason: string to use in the error message
2192
    - requested: the amount of memory in MiB
2193

2194
  """
2195
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2196
  if not nodeinfo or not isinstance(nodeinfo, dict):
2197
    raise errors.OpPrereqError("Could not contact node %s for resource"
2198
                             " information" % (node,))
2199

    
2200
  free_mem = nodeinfo[node].get('memory_free')
2201
  if not isinstance(free_mem, int):
2202
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2203
                             " was '%s'" % (node, free_mem))
2204
  if requested > free_mem:
2205
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2206
                             " needed %s MiB, available %s MiB" %
2207
                             (node, reason, requested, free_mem))
2208

    
2209

    
2210
class LUStartupInstance(LogicalUnit):
2211
  """Starts an instance.
2212

2213
  """
2214
  HPATH = "instance-start"
2215
  HTYPE = constants.HTYPE_INSTANCE
2216
  _OP_REQP = ["instance_name", "force"]
2217

    
2218
  def BuildHooksEnv(self):
2219
    """Build hooks env.
2220

2221
    This runs on master, primary and secondary nodes of the instance.
2222

2223
    """
2224
    env = {
2225
      "FORCE": self.op.force,
2226
      }
2227
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2228
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2229
          list(self.instance.secondary_nodes))
2230
    return env, nl, nl
2231

    
2232
  def CheckPrereq(self):
2233
    """Check prerequisites.
2234

2235
    This checks that the instance is in the cluster.
2236

2237
    """
2238
    instance = self.cfg.GetInstanceInfo(
2239
      self.cfg.ExpandInstanceName(self.op.instance_name))
2240
    if instance is None:
2241
      raise errors.OpPrereqError("Instance '%s' not known" %
2242
                                 self.op.instance_name)
2243

    
2244
    # check bridges existance
2245
    _CheckInstanceBridgesExist(instance)
2246

    
2247
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2248
                         "starting instance %s" % instance.name,
2249
                         instance.memory)
2250

    
2251
    self.instance = instance
2252
    self.op.instance_name = instance.name
2253

    
2254
  def Exec(self, feedback_fn):
2255
    """Start the instance.
2256

2257
    """
2258
    instance = self.instance
2259
    force = self.op.force
2260
    extra_args = getattr(self.op, "extra_args", "")
2261

    
2262
    self.cfg.MarkInstanceUp(instance.name)
2263

    
2264
    node_current = instance.primary_node
2265

    
2266
    _StartInstanceDisks(self.cfg, instance, force)
2267

    
2268
    if not rpc.call_instance_start(node_current, instance, extra_args):
2269
      _ShutdownInstanceDisks(instance, self.cfg)
2270
      raise errors.OpExecError("Could not start instance")
2271

    
2272

    
2273
class LURebootInstance(LogicalUnit):
2274
  """Reboot an instance.
2275

2276
  """
2277
  HPATH = "instance-reboot"
2278
  HTYPE = constants.HTYPE_INSTANCE
2279
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2280

    
2281
  def BuildHooksEnv(self):
2282
    """Build hooks env.
2283

2284
    This runs on master, primary and secondary nodes of the instance.
2285

2286
    """
2287
    env = {
2288
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2289
      }
2290
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2291
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2292
          list(self.instance.secondary_nodes))
2293
    return env, nl, nl
2294

    
2295
  def CheckPrereq(self):
2296
    """Check prerequisites.
2297

2298
    This checks that the instance is in the cluster.
2299

2300
    """
2301
    instance = self.cfg.GetInstanceInfo(
2302
      self.cfg.ExpandInstanceName(self.op.instance_name))
2303
    if instance is None:
2304
      raise errors.OpPrereqError("Instance '%s' not known" %
2305
                                 self.op.instance_name)
2306

    
2307
    # check bridges existance
2308
    _CheckInstanceBridgesExist(instance)
2309

    
2310
    self.instance = instance
2311
    self.op.instance_name = instance.name
2312

    
2313
  def Exec(self, feedback_fn):
2314
    """Reboot the instance.
2315

2316
    """
2317
    instance = self.instance
2318
    ignore_secondaries = self.op.ignore_secondaries
2319
    reboot_type = self.op.reboot_type
2320
    extra_args = getattr(self.op, "extra_args", "")
2321

    
2322
    node_current = instance.primary_node
2323

    
2324
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2325
                           constants.INSTANCE_REBOOT_HARD,
2326
                           constants.INSTANCE_REBOOT_FULL]:
2327
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2328
                                  (constants.INSTANCE_REBOOT_SOFT,
2329
                                   constants.INSTANCE_REBOOT_HARD,
2330
                                   constants.INSTANCE_REBOOT_FULL))
2331

    
2332
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2333
                       constants.INSTANCE_REBOOT_HARD]:
2334
      if not rpc.call_instance_reboot(node_current, instance,
2335
                                      reboot_type, extra_args):
2336
        raise errors.OpExecError("Could not reboot instance")
2337
    else:
2338
      if not rpc.call_instance_shutdown(node_current, instance):
2339
        raise errors.OpExecError("could not shutdown instance for full reboot")
2340
      _ShutdownInstanceDisks(instance, self.cfg)
2341
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2342
      if not rpc.call_instance_start(node_current, instance, extra_args):
2343
        _ShutdownInstanceDisks(instance, self.cfg)
2344
        raise errors.OpExecError("Could not start instance for full reboot")
2345

    
2346
    self.cfg.MarkInstanceUp(instance.name)
2347

    
2348

    
2349
class LUShutdownInstance(LogicalUnit):
2350
  """Shutdown an instance.
2351

2352
  """
2353
  HPATH = "instance-stop"
2354
  HTYPE = constants.HTYPE_INSTANCE
2355
  _OP_REQP = ["instance_name"]
2356

    
2357
  def BuildHooksEnv(self):
2358
    """Build hooks env.
2359

2360
    This runs on master, primary and secondary nodes of the instance.
2361

2362
    """
2363
    env = _BuildInstanceHookEnvByObject(self.instance)
2364
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2365
          list(self.instance.secondary_nodes))
2366
    return env, nl, nl
2367

    
2368
  def CheckPrereq(self):
2369
    """Check prerequisites.
2370

2371
    This checks that the instance is in the cluster.
2372

2373
    """
2374
    instance = self.cfg.GetInstanceInfo(
2375
      self.cfg.ExpandInstanceName(self.op.instance_name))
2376
    if instance is None:
2377
      raise errors.OpPrereqError("Instance '%s' not known" %
2378
                                 self.op.instance_name)
2379
    self.instance = instance
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Shutdown the instance.
2383

2384
    """
2385
    instance = self.instance
2386
    node_current = instance.primary_node
2387
    self.cfg.MarkInstanceDown(instance.name)
2388
    if not rpc.call_instance_shutdown(node_current, instance):
2389
      logger.Error("could not shutdown instance")
2390

    
2391
    _ShutdownInstanceDisks(instance, self.cfg)
2392

    
2393

    
2394
class LUReinstallInstance(LogicalUnit):
2395
  """Reinstall an instance.
2396

2397
  """
2398
  HPATH = "instance-reinstall"
2399
  HTYPE = constants.HTYPE_INSTANCE
2400
  _OP_REQP = ["instance_name"]
2401

    
2402
  def BuildHooksEnv(self):
2403
    """Build hooks env.
2404

2405
    This runs on master, primary and secondary nodes of the instance.
2406

2407
    """
2408
    env = _BuildInstanceHookEnvByObject(self.instance)
2409
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2410
          list(self.instance.secondary_nodes))
2411
    return env, nl, nl
2412

    
2413
  def CheckPrereq(self):
2414
    """Check prerequisites.
2415

2416
    This checks that the instance is in the cluster and is not running.
2417

2418
    """
2419
    instance = self.cfg.GetInstanceInfo(
2420
      self.cfg.ExpandInstanceName(self.op.instance_name))
2421
    if instance is None:
2422
      raise errors.OpPrereqError("Instance '%s' not known" %
2423
                                 self.op.instance_name)
2424
    if instance.disk_template == constants.DT_DISKLESS:
2425
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2426
                                 self.op.instance_name)
2427
    if instance.status != "down":
2428
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2429
                                 self.op.instance_name)
2430
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2431
    if remote_info:
2432
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2433
                                 (self.op.instance_name,
2434
                                  instance.primary_node))
2435

    
2436
    self.op.os_type = getattr(self.op, "os_type", None)
2437
    if self.op.os_type is not None:
2438
      # OS verification
2439
      pnode = self.cfg.GetNodeInfo(
2440
        self.cfg.ExpandNodeName(instance.primary_node))
2441
      if pnode is None:
2442
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2443
                                   self.op.pnode)
2444
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2445
      if not os_obj:
2446
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2447
                                   " primary node"  % self.op.os_type)
2448

    
2449
    self.instance = instance
2450

    
2451
  def Exec(self, feedback_fn):
2452
    """Reinstall the instance.
2453

2454
    """
2455
    inst = self.instance
2456

    
2457
    if self.op.os_type is not None:
2458
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2459
      inst.os = self.op.os_type
2460
      self.cfg.AddInstance(inst)
2461

    
2462
    _StartInstanceDisks(self.cfg, inst, None)
2463
    try:
2464
      feedback_fn("Running the instance OS create scripts...")
2465
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2466
        raise errors.OpExecError("Could not install OS for instance %s"
2467
                                 " on node %s" %
2468
                                 (inst.name, inst.primary_node))
2469
    finally:
2470
      _ShutdownInstanceDisks(inst, self.cfg)
2471

    
2472

    
2473
class LURenameInstance(LogicalUnit):
2474
  """Rename an instance.
2475

2476
  """
2477
  HPATH = "instance-rename"
2478
  HTYPE = constants.HTYPE_INSTANCE
2479
  _OP_REQP = ["instance_name", "new_name"]
2480

    
2481
  def BuildHooksEnv(self):
2482
    """Build hooks env.
2483

2484
    This runs on master, primary and secondary nodes of the instance.
2485

2486
    """
2487
    env = _BuildInstanceHookEnvByObject(self.instance)
2488
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2489
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2490
          list(self.instance.secondary_nodes))
2491
    return env, nl, nl
2492

    
2493
  def CheckPrereq(self):
2494
    """Check prerequisites.
2495

2496
    This checks that the instance is in the cluster and is not running.
2497

2498
    """
2499
    instance = self.cfg.GetInstanceInfo(
2500
      self.cfg.ExpandInstanceName(self.op.instance_name))
2501
    if instance is None:
2502
      raise errors.OpPrereqError("Instance '%s' not known" %
2503
                                 self.op.instance_name)
2504
    if instance.status != "down":
2505
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2506
                                 self.op.instance_name)
2507
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2508
    if remote_info:
2509
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2510
                                 (self.op.instance_name,
2511
                                  instance.primary_node))
2512
    self.instance = instance
2513

    
2514
    # new name verification
2515
    name_info = utils.HostInfo(self.op.new_name)
2516

    
2517
    self.op.new_name = new_name = name_info.name
2518
    instance_list = self.cfg.GetInstanceList()
2519
    if new_name in instance_list:
2520
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2521
                                 new_name)
2522

    
2523
    if not getattr(self.op, "ignore_ip", False):
2524
      command = ["fping", "-q", name_info.ip]
2525
      result = utils.RunCmd(command)
2526
      if not result.failed:
2527
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2528
                                   (name_info.ip, new_name))
2529

    
2530

    
2531
  def Exec(self, feedback_fn):
2532
    """Reinstall the instance.
2533

2534
    """
2535
    inst = self.instance
2536
    old_name = inst.name
2537

    
2538
    if inst.disk_template == constants.DT_FILE:
2539
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2540

    
2541
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2542

    
2543
    # re-read the instance from the configuration after rename
2544
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2545

    
2546
    if inst.disk_template == constants.DT_FILE:
2547
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2548
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2549
                                                old_file_storage_dir,
2550
                                                new_file_storage_dir)
2551

    
2552
      if not result:
2553
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2554
                                 " directory '%s' to '%s' (but the instance"
2555
                                 " has been renamed in Ganeti)" % (
2556
                                 inst.primary_node, old_file_storage_dir,
2557
                                 new_file_storage_dir))
2558

    
2559
      if not result[0]:
2560
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2561
                                 " (but the instance has been renamed in"
2562
                                 " Ganeti)" % (old_file_storage_dir,
2563
                                               new_file_storage_dir))
2564

    
2565
    _StartInstanceDisks(self.cfg, inst, None)
2566
    try:
2567
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2568
                                          "sda", "sdb"):
2569
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2570
               " instance has been renamed in Ganeti)" %
2571
               (inst.name, inst.primary_node))
2572
        logger.Error(msg)
2573
    finally:
2574
      _ShutdownInstanceDisks(inst, self.cfg)
2575

    
2576

    
2577
class LURemoveInstance(LogicalUnit):
2578
  """Remove an instance.
2579

2580
  """
2581
  HPATH = "instance-remove"
2582
  HTYPE = constants.HTYPE_INSTANCE
2583
  _OP_REQP = ["instance_name"]
2584

    
2585
  def BuildHooksEnv(self):
2586
    """Build hooks env.
2587

2588
    This runs on master, primary and secondary nodes of the instance.
2589

2590
    """
2591
    env = _BuildInstanceHookEnvByObject(self.instance)
2592
    nl = [self.sstore.GetMasterNode()]
2593
    return env, nl, nl
2594

    
2595
  def CheckPrereq(self):
2596
    """Check prerequisites.
2597

2598
    This checks that the instance is in the cluster.
2599

2600
    """
2601
    instance = self.cfg.GetInstanceInfo(
2602
      self.cfg.ExpandInstanceName(self.op.instance_name))
2603
    if instance is None:
2604
      raise errors.OpPrereqError("Instance '%s' not known" %
2605
                                 self.op.instance_name)
2606
    self.instance = instance
2607

    
2608
  def Exec(self, feedback_fn):
2609
    """Remove the instance.
2610

2611
    """
2612
    instance = self.instance
2613
    logger.Info("shutting down instance %s on node %s" %
2614
                (instance.name, instance.primary_node))
2615

    
2616
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2617
      if self.op.ignore_failures:
2618
        feedback_fn("Warning: can't shutdown instance")
2619
      else:
2620
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2621
                                 (instance.name, instance.primary_node))
2622

    
2623
    logger.Info("removing block devices for instance %s" % instance.name)
2624

    
2625
    if not _RemoveDisks(instance, self.cfg):
2626
      if self.op.ignore_failures:
2627
        feedback_fn("Warning: can't remove instance's disks")
2628
      else:
2629
        raise errors.OpExecError("Can't remove instance's disks")
2630

    
2631
    logger.Info("removing instance %s out of cluster config" % instance.name)
2632

    
2633
    self.cfg.RemoveInstance(instance.name)
2634

    
2635

    
2636
class LUQueryInstances(NoHooksLU):
2637
  """Logical unit for querying instances.
2638

2639
  """
2640
  _OP_REQP = ["output_fields", "names"]
2641

    
2642
  def CheckPrereq(self):
2643
    """Check prerequisites.
2644

2645
    This checks that the fields required are valid output fields.
2646

2647
    """
2648
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2649
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2650
                               "admin_state", "admin_ram",
2651
                               "disk_template", "ip", "mac", "bridge",
2652
                               "sda_size", "sdb_size", "vcpus"],
2653
                       dynamic=self.dynamic_fields,
2654
                       selected=self.op.output_fields)
2655

    
2656
    self.wanted = _GetWantedInstances(self, self.op.names)
2657

    
2658
  def Exec(self, feedback_fn):
2659
    """Computes the list of nodes and their attributes.
2660

2661
    """
2662
    instance_names = self.wanted
2663
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2664
                     in instance_names]
2665

    
2666
    # begin data gathering
2667

    
2668
    nodes = frozenset([inst.primary_node for inst in instance_list])
2669

    
2670
    bad_nodes = []
2671
    if self.dynamic_fields.intersection(self.op.output_fields):
2672
      live_data = {}
2673
      node_data = rpc.call_all_instances_info(nodes)
2674
      for name in nodes:
2675
        result = node_data[name]
2676
        if result:
2677
          live_data.update(result)
2678
        elif result == False:
2679
          bad_nodes.append(name)
2680
        # else no instance is alive
2681
    else:
2682
      live_data = dict([(name, {}) for name in instance_names])
2683

    
2684
    # end data gathering
2685

    
2686
    output = []
2687
    for instance in instance_list:
2688
      iout = []
2689
      for field in self.op.output_fields:
2690
        if field == "name":
2691
          val = instance.name
2692
        elif field == "os":
2693
          val = instance.os
2694
        elif field == "pnode":
2695
          val = instance.primary_node
2696
        elif field == "snodes":
2697
          val = list(instance.secondary_nodes)
2698
        elif field == "admin_state":
2699
          val = (instance.status != "down")
2700
        elif field == "oper_state":
2701
          if instance.primary_node in bad_nodes:
2702
            val = None
2703
          else:
2704
            val = bool(live_data.get(instance.name))
2705
        elif field == "status":
2706
          if instance.primary_node in bad_nodes:
2707
            val = "ERROR_nodedown"
2708
          else:
2709
            running = bool(live_data.get(instance.name))
2710
            if running:
2711
              if instance.status != "down":
2712
                val = "running"
2713
              else:
2714
                val = "ERROR_up"
2715
            else:
2716
              if instance.status != "down":
2717
                val = "ERROR_down"
2718
              else:
2719
                val = "ADMIN_down"
2720
        elif field == "admin_ram":
2721
          val = instance.memory
2722
        elif field == "oper_ram":
2723
          if instance.primary_node in bad_nodes:
2724
            val = None
2725
          elif instance.name in live_data:
2726
            val = live_data[instance.name].get("memory", "?")
2727
          else:
2728
            val = "-"
2729
        elif field == "disk_template":
2730
          val = instance.disk_template
2731
        elif field == "ip":
2732
          val = instance.nics[0].ip
2733
        elif field == "bridge":
2734
          val = instance.nics[0].bridge
2735
        elif field == "mac":
2736
          val = instance.nics[0].mac
2737
        elif field == "sda_size" or field == "sdb_size":
2738
          disk = instance.FindDisk(field[:3])
2739
          if disk is None:
2740
            val = None
2741
          else:
2742
            val = disk.size
2743
        elif field == "vcpus":
2744
          val = instance.vcpus
2745
        else:
2746
          raise errors.ParameterError(field)
2747
        iout.append(val)
2748
      output.append(iout)
2749

    
2750
    return output
2751

    
2752

    
2753
class LUFailoverInstance(LogicalUnit):
2754
  """Failover an instance.
2755

2756
  """
2757
  HPATH = "instance-failover"
2758
  HTYPE = constants.HTYPE_INSTANCE
2759
  _OP_REQP = ["instance_name", "ignore_consistency"]
2760

    
2761
  def BuildHooksEnv(self):
2762
    """Build hooks env.
2763

2764
    This runs on master, primary and secondary nodes of the instance.
2765

2766
    """
2767
    env = {
2768
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2769
      }
2770
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2771
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2772
    return env, nl, nl
2773

    
2774
  def CheckPrereq(self):
2775
    """Check prerequisites.
2776

2777
    This checks that the instance is in the cluster.
2778

2779
    """
2780
    instance = self.cfg.GetInstanceInfo(
2781
      self.cfg.ExpandInstanceName(self.op.instance_name))
2782
    if instance is None:
2783
      raise errors.OpPrereqError("Instance '%s' not known" %
2784
                                 self.op.instance_name)
2785

    
2786
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2787
      raise errors.OpPrereqError("Instance's disk layout is not"
2788
                                 " network mirrored, cannot failover.")
2789

    
2790
    secondary_nodes = instance.secondary_nodes
2791
    if not secondary_nodes:
2792
      raise errors.ProgrammerError("no secondary node but using "
2793
                                   "DT_REMOTE_RAID1 template")
2794

    
2795
    target_node = secondary_nodes[0]
2796
    # check memory requirements on the secondary node
2797
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2798
                         instance.name, instance.memory)
2799

    
2800
    # check bridge existance
2801
    brlist = [nic.bridge for nic in instance.nics]
2802
    if not rpc.call_bridges_exist(target_node, brlist):
2803
      raise errors.OpPrereqError("One or more target bridges %s does not"
2804
                                 " exist on destination node '%s'" %
2805
                                 (brlist, target_node))
2806

    
2807
    self.instance = instance
2808

    
2809
  def Exec(self, feedback_fn):
2810
    """Failover an instance.
2811

2812
    The failover is done by shutting it down on its present node and
2813
    starting it on the secondary.
2814

2815
    """
2816
    instance = self.instance
2817

    
2818
    source_node = instance.primary_node
2819
    target_node = instance.secondary_nodes[0]
2820

    
2821
    feedback_fn("* checking disk consistency between source and target")
2822
    for dev in instance.disks:
2823
      # for remote_raid1, these are md over drbd
2824
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2825
        if instance.status == "up" and not self.op.ignore_consistency:
2826
          raise errors.OpExecError("Disk %s is degraded on target node,"
2827
                                   " aborting failover." % dev.iv_name)
2828

    
2829
    feedback_fn("* shutting down instance on source node")
2830
    logger.Info("Shutting down instance %s on node %s" %
2831
                (instance.name, source_node))
2832

    
2833
    if not rpc.call_instance_shutdown(source_node, instance):
2834
      if self.op.ignore_consistency:
2835
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2836
                     " anyway. Please make sure node %s is down"  %
2837
                     (instance.name, source_node, source_node))
2838
      else:
2839
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2840
                                 (instance.name, source_node))
2841

    
2842
    feedback_fn("* deactivating the instance's disks on source node")
2843
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2844
      raise errors.OpExecError("Can't shut down the instance's disks.")
2845

    
2846
    instance.primary_node = target_node
2847
    # distribute new instance config to the other nodes
2848
    self.cfg.AddInstance(instance)
2849

    
2850
    # Only start the instance if it's marked as up
2851
    if instance.status == "up":
2852
      feedback_fn("* activating the instance's disks on target node")
2853
      logger.Info("Starting instance %s on node %s" %
2854
                  (instance.name, target_node))
2855

    
2856
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2857
                                               ignore_secondaries=True)
2858
      if not disks_ok:
2859
        _ShutdownInstanceDisks(instance, self.cfg)
2860
        raise errors.OpExecError("Can't activate the instance's disks")
2861

    
2862
      feedback_fn("* starting the instance on the target node")
2863
      if not rpc.call_instance_start(target_node, instance, None):
2864
        _ShutdownInstanceDisks(instance, self.cfg)
2865
        raise errors.OpExecError("Could not start instance %s on node %s." %
2866
                                 (instance.name, target_node))
2867

    
2868

    
2869
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2870
  """Create a tree of block devices on the primary node.
2871

2872
  This always creates all devices.
2873

2874
  """
2875
  if device.children:
2876
    for child in device.children:
2877
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2878
        return False
2879

    
2880
  cfg.SetDiskID(device, node)
2881
  new_id = rpc.call_blockdev_create(node, device, device.size,
2882
                                    instance.name, True, info)
2883
  if not new_id:
2884
    return False
2885
  if device.physical_id is None:
2886
    device.physical_id = new_id
2887
  return True
2888

    
2889

    
2890
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2891
  """Create a tree of block devices on a secondary node.
2892

2893
  If this device type has to be created on secondaries, create it and
2894
  all its children.
2895

2896
  If not, just recurse to children keeping the same 'force' value.
2897

2898
  """
2899
  if device.CreateOnSecondary():
2900
    force = True
2901
  if device.children:
2902
    for child in device.children:
2903
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2904
                                        child, force, info):
2905
        return False
2906

    
2907
  if not force:
2908
    return True
2909
  cfg.SetDiskID(device, node)
2910
  new_id = rpc.call_blockdev_create(node, device, device.size,
2911
                                    instance.name, False, info)
2912
  if not new_id:
2913
    return False
2914
  if device.physical_id is None:
2915
    device.physical_id = new_id
2916
  return True
2917

    
2918

    
2919
def _GenerateUniqueNames(cfg, exts):
2920
  """Generate a suitable LV name.
2921

2922
  This will generate a logical volume name for the given instance.
2923

2924
  """
2925
  results = []
2926
  for val in exts:
2927
    new_id = cfg.GenerateUniqueID()
2928
    results.append("%s%s" % (new_id, val))
2929
  return results
2930

    
2931

    
2932
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2933
  """Generate a drbd device complete with its children.
2934

2935
  """
2936
  port = cfg.AllocatePort()
2937
  vgname = cfg.GetVGName()
2938
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2939
                          logical_id=(vgname, names[0]))
2940
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2941
                          logical_id=(vgname, names[1]))
2942
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2943
                          logical_id = (primary, secondary, port),
2944
                          children = [dev_data, dev_meta])
2945
  return drbd_dev
2946

    
2947

    
2948
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2949
  """Generate a drbd8 device complete with its children.
2950

2951
  """
2952
  port = cfg.AllocatePort()
2953
  vgname = cfg.GetVGName()
2954
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2955
                          logical_id=(vgname, names[0]))
2956
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2957
                          logical_id=(vgname, names[1]))
2958
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2959
                          logical_id = (primary, secondary, port),
2960
                          children = [dev_data, dev_meta],
2961
                          iv_name=iv_name)
2962
  return drbd_dev
2963

    
2964

    
2965
def _GenerateDiskTemplate(cfg, template_name,
2966
                          instance_name, primary_node,
2967
                          secondary_nodes, disk_sz, swap_sz,
2968
                          file_storage_dir, file_driver):
2969
  """Generate the entire disk layout for a given template type.
2970

2971
  """
2972
  #TODO: compute space requirements
2973

    
2974
  vgname = cfg.GetVGName()
2975
  if template_name == constants.DT_DISKLESS:
2976
    disks = []
2977
  elif template_name == constants.DT_PLAIN:
2978
    if len(secondary_nodes) != 0:
2979
      raise errors.ProgrammerError("Wrong template configuration")
2980

    
2981
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2982
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2983
                           logical_id=(vgname, names[0]),
2984
                           iv_name = "sda")
2985
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2986
                           logical_id=(vgname, names[1]),
2987
                           iv_name = "sdb")
2988
    disks = [sda_dev, sdb_dev]
2989
  elif template_name == constants.DT_DRBD8:
2990
    if len(secondary_nodes) != 1:
2991
      raise errors.ProgrammerError("Wrong template configuration")
2992
    remote_node = secondary_nodes[0]
2993
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2994
                                       ".sdb_data", ".sdb_meta"])
2995
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2996
                                         disk_sz, names[0:2], "sda")
2997
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2998
                                         swap_sz, names[2:4], "sdb")
2999
    disks = [drbd_sda_dev, drbd_sdb_dev]
3000
  elif template_name == constants.DT_FILE:
3001
    if len(secondary_nodes) != 0:
3002
      raise errors.ProgrammerError("Wrong template configuration")
3003

    
3004
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3005
                                iv_name="sda", logical_id=(file_driver,
3006
                                "%s/sda" % file_storage_dir))
3007
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3008
                                iv_name="sdb", logical_id=(file_driver,
3009
                                "%s/sdb" % file_storage_dir))
3010
    disks = [file_sda_dev, file_sdb_dev]
3011
  else:
3012
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3013
  return disks
3014

    
3015

    
3016
def _GetInstanceInfoText(instance):
3017
  """Compute that text that should be added to the disk's metadata.
3018

3019
  """
3020
  return "originstname+%s" % instance.name
3021

    
3022

    
3023
def _CreateDisks(cfg, instance):
3024
  """Create all disks for an instance.
3025

3026
  This abstracts away some work from AddInstance.
3027

3028
  Args:
3029
    instance: the instance object
3030

3031
  Returns:
3032
    True or False showing the success of the creation process
3033

3034
  """
3035
  info = _GetInstanceInfoText(instance)
3036

    
3037
  if instance.disk_template == constants.DT_FILE:
3038
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3039
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3040
                                              file_storage_dir)
3041

    
3042
    if not result:
3043
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3044
      return False
3045

    
3046
    if not result[0]:
3047
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3048
      return False
3049

    
3050
  for device in instance.disks:
3051
    logger.Info("creating volume %s for instance %s" %
3052
                (device.iv_name, instance.name))
3053
    #HARDCODE
3054
    for secondary_node in instance.secondary_nodes:
3055
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3056
                                        device, False, info):
3057
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3058
                     (device.iv_name, device, secondary_node))
3059
        return False
3060
    #HARDCODE
3061
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3062
                                    instance, device, info):
3063
      logger.Error("failed to create volume %s on primary!" %
3064
                   device.iv_name)
3065
      return False
3066

    
3067
  return True
3068

    
3069

    
3070
def _RemoveDisks(instance, cfg):
3071
  """Remove all disks for an instance.
3072

3073
  This abstracts away some work from `AddInstance()` and
3074
  `RemoveInstance()`. Note that in case some of the devices couldn't
3075
  be removed, the removal will continue with the other ones (compare
3076
  with `_CreateDisks()`).
3077

3078
  Args:
3079
    instance: the instance object
3080

3081
  Returns:
3082
    True or False showing the success of the removal proces
3083

3084
  """
3085
  logger.Info("removing block devices for instance %s" % instance.name)
3086

    
3087
  result = True
3088
  for device in instance.disks:
3089
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3090
      cfg.SetDiskID(disk, node)
3091
      if not rpc.call_blockdev_remove(node, disk):
3092
        logger.Error("could not remove block device %s on node %s,"
3093
                     " continuing anyway" %
3094
                     (device.iv_name, node))
3095
        result = False
3096

    
3097
  if instance.disk_template == constants.DT_FILE:
3098
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3099
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3100
                                            file_storage_dir):
3101
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3102
      result = False
3103

    
3104
  return result
3105

    
3106

    
3107
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3108
  """Compute disk size requirements in the volume group
3109

3110
  This is currently hard-coded for the two-drive layout.
3111

3112
  """
3113
  # Required free disk space as a function of disk and swap space
3114
  req_size_dict = {
3115
    constants.DT_DISKLESS: None,
3116
    constants.DT_PLAIN: disk_size + swap_size,
3117
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3118
    constants.DT_DRBD8: disk_size + swap_size + 256,
3119
    constants.DT_FILE: None,
3120
  }
3121

    
3122
  if disk_template not in req_size_dict:
3123
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3124
                                 " is unknown" %  disk_template)
3125

    
3126
  return req_size_dict[disk_template]
3127

    
3128

    
3129
class LUCreateInstance(LogicalUnit):
3130
  """Create an instance.
3131

3132
  """
3133
  HPATH = "instance-add"
3134
  HTYPE = constants.HTYPE_INSTANCE
3135
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3136
              "disk_template", "swap_size", "mode", "start", "vcpus",
3137
              "wait_for_sync", "ip_check", "mac"]
3138

    
3139
  def _RunAllocator(self):
3140
    """Run the allocator based on input opcode.
3141

3142
    """
3143
    disks = [{"size": self.op.disk_size, "mode": "w"},
3144
             {"size": self.op.swap_size, "mode": "w"}]
3145
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3146
             "bridge": self.op.bridge}]
3147
    ial = IAllocator(self.cfg, self.sstore,
3148
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3149
                     name=self.op.instance_name,
3150
                     disk_template=self.op.disk_template,
3151
                     tags=[],
3152
                     os=self.op.os_type,
3153
                     vcpus=self.op.vcpus,
3154
                     mem_size=self.op.mem_size,
3155
                     disks=disks,
3156
                     nics=nics,
3157
                     )
3158

    
3159
    ial.Run(self.op.iallocator)
3160

    
3161
    if not ial.success:
3162
      raise errors.OpPrereqError("Can't compute nodes using"
3163
                                 " iallocator '%s': %s" % (self.op.iallocator,
3164
                                                           ial.info))
3165
    if len(ial.nodes) != ial.required_nodes:
3166
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3167
                                 " of nodes (%s), required %s" %
3168
                                 (len(ial.nodes), ial.required_nodes))
3169
    self.op.pnode = ial.nodes[0]
3170
    logger.ToStdout("Selected nodes for the instance: %s" %
3171
                    (", ".join(ial.nodes),))
3172
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3173
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3174
    if ial.required_nodes == 2:
3175
      self.op.snode = ial.nodes[1]
3176

    
3177
  def BuildHooksEnv(self):
3178
    """Build hooks env.
3179

3180
    This runs on master, primary and secondary nodes of the instance.
3181

3182
    """
3183
    env = {
3184
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3185
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3186
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3187
      "INSTANCE_ADD_MODE": self.op.mode,
3188
      }
3189
    if self.op.mode == constants.INSTANCE_IMPORT:
3190
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3191
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3192
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3193

    
3194
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3195
      primary_node=self.op.pnode,
3196
      secondary_nodes=self.secondaries,
3197
      status=self.instance_status,
3198
      os_type=self.op.os_type,
3199
      memory=self.op.mem_size,
3200
      vcpus=self.op.vcpus,
3201
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3202
    ))
3203

    
3204
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3205
          self.secondaries)
3206
    return env, nl, nl
3207

    
3208

    
3209
  def CheckPrereq(self):
3210
    """Check prerequisites.
3211

3212
    """
3213
    # set optional parameters to none if they don't exist
3214
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3215
                 "iallocator"]:
3216
      if not hasattr(self.op, attr):
3217
        setattr(self.op, attr, None)
3218

    
3219
    if self.op.mode not in (constants.INSTANCE_CREATE,
3220
                            constants.INSTANCE_IMPORT):
3221
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3222
                                 self.op.mode)
3223

    
3224
    if (not self.cfg.GetVGName() and
3225
        self.op.disk_template not in constants.DTS_NOT_LVM):
3226
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3227
                                 " instances")
3228

    
3229
    if self.op.mode == constants.INSTANCE_IMPORT:
3230
      src_node = getattr(self.op, "src_node", None)
3231
      src_path = getattr(self.op, "src_path", None)
3232
      if src_node is None or src_path is None:
3233
        raise errors.OpPrereqError("Importing an instance requires source"
3234
                                   " node and path options")
3235
      src_node_full = self.cfg.ExpandNodeName(src_node)
3236
      if src_node_full is None:
3237
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3238
      self.op.src_node = src_node = src_node_full
3239

    
3240
      if not os.path.isabs(src_path):
3241
        raise errors.OpPrereqError("The source path must be absolute")
3242

    
3243
      export_info = rpc.call_export_info(src_node, src_path)
3244

    
3245
      if not export_info:
3246
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3247

    
3248
      if not export_info.has_section(constants.INISECT_EXP):
3249
        raise errors.ProgrammerError("Corrupted export config")
3250

    
3251
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3252
      if (int(ei_version) != constants.EXPORT_VERSION):
3253
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3254
                                   (ei_version, constants.EXPORT_VERSION))
3255

    
3256
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3257
        raise errors.OpPrereqError("Can't import instance with more than"
3258
                                   " one data disk")
3259

    
3260
      # FIXME: are the old os-es, disk sizes, etc. useful?
3261
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3262
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3263
                                                         'disk0_dump'))
3264
      self.src_image = diskimage
3265
    else: # INSTANCE_CREATE
3266
      if getattr(self.op, "os_type", None) is None:
3267
        raise errors.OpPrereqError("No guest OS specified")
3268

    
3269
    #### instance parameters check
3270

    
3271
    # disk template and mirror node verification
3272
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3273
      raise errors.OpPrereqError("Invalid disk template name")
3274

    
3275
    # instance name verification
3276
    hostname1 = utils.HostInfo(self.op.instance_name)
3277

    
3278
    self.op.instance_name = instance_name = hostname1.name
3279
    instance_list = self.cfg.GetInstanceList()
3280
    if instance_name in instance_list:
3281
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3282
                                 instance_name)
3283

    
3284
    # ip validity checks
3285
    ip = getattr(self.op, "ip", None)
3286
    if ip is None or ip.lower() == "none":
3287
      inst_ip = None
3288
    elif ip.lower() == "auto":
3289
      inst_ip = hostname1.ip
3290
    else:
3291
      if not utils.IsValidIP(ip):
3292
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3293
                                   " like a valid IP" % ip)
3294
      inst_ip = ip
3295
    self.inst_ip = self.op.ip = inst_ip
3296

    
3297
    if self.op.start and not self.op.ip_check:
3298
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3299
                                 " adding an instance in start mode")
3300

    
3301
    if self.op.ip_check:
3302
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3303
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3304
                                   (hostname1.ip, instance_name))
3305

    
3306
    # MAC address verification
3307
    if self.op.mac != "auto":
3308
      if not utils.IsValidMac(self.op.mac.lower()):
3309
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3310
                                   self.op.mac)
3311

    
3312
    # bridge verification
3313
    bridge = getattr(self.op, "bridge", None)
3314
    if bridge is None:
3315
      self.op.bridge = self.cfg.GetDefBridge()
3316
    else:
3317
      self.op.bridge = bridge
3318

    
3319
    # boot order verification
3320
    if self.op.hvm_boot_order is not None:
3321
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3322
        raise errors.OpPrereqError("invalid boot order specified,"
3323
                                   " must be one or more of [acdn]")
3324
    # file storage checks
3325
    if (self.op.file_driver and
3326
        not self.op.file_driver in constants.FILE_DRIVER):
3327
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3328
                                 self.op.file_driver)
3329

    
3330
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3331
      raise errors.OpPrereqError("File storage directory not a relative"
3332
                                 " path")
3333
    #### allocator run
3334

    
3335
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3336
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3337
                                 " node must be given")
3338

    
3339
    if self.op.iallocator is not None:
3340
      self._RunAllocator()
3341

    
3342
    #### node related checks
3343

    
3344
    # check primary node
3345
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3346
    if pnode is None:
3347
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3348
                                 self.op.pnode)
3349
    self.op.pnode = pnode.name
3350
    self.pnode = pnode
3351
    self.secondaries = []
3352

    
3353
    # mirror node verification
3354
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3355
      if getattr(self.op, "snode", None) is None:
3356
        raise errors.OpPrereqError("The networked disk templates need"
3357
                                   " a mirror node")
3358

    
3359
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3360
      if snode_name is None:
3361
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3362
                                   self.op.snode)
3363
      elif snode_name == pnode.name:
3364
        raise errors.OpPrereqError("The secondary node cannot be"
3365
                                   " the primary node.")
3366
      self.secondaries.append(snode_name)
3367

    
3368
    req_size = _ComputeDiskSize(self.op.disk_template,
3369
                                self.op.disk_size, self.op.swap_size)
3370

    
3371
    # Check lv size requirements
3372
    if req_size is not None:
3373
      nodenames = [pnode.name] + self.secondaries
3374
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3375
      for node in nodenames:
3376
        info = nodeinfo.get(node, None)
3377
        if not info:
3378
          raise errors.OpPrereqError("Cannot get current information"
3379
                                     " from node '%s'" % nodeinfo)
3380
        vg_free = info.get('vg_free', None)
3381
        if not isinstance(vg_free, int):
3382
          raise errors.OpPrereqError("Can't compute free disk space on"
3383
                                     " node %s" % node)
3384
        if req_size > info['vg_free']:
3385
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3386
                                     " %d MB available, %d MB required" %
3387
                                     (node, info['vg_free'], req_size))
3388

    
3389
    # os verification
3390
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3391
    if not os_obj:
3392
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3393
                                 " primary node"  % self.op.os_type)
3394

    
3395
    if self.op.kernel_path == constants.VALUE_NONE:
3396
      raise errors.OpPrereqError("Can't set instance kernel to none")
3397

    
3398

    
3399
    # bridge check on primary node
3400
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3401
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3402
                                 " destination node '%s'" %
3403
                                 (self.op.bridge, pnode.name))
3404

    
3405
    if self.op.start:
3406
      self.instance_status = 'up'
3407
    else:
3408
      self.instance_status = 'down'
3409

    
3410
  def Exec(self, feedback_fn):
3411
    """Create and add the instance to the cluster.
3412

3413
    """
3414
    instance = self.op.instance_name
3415
    pnode_name = self.pnode.name
3416

    
3417
    if self.op.mac == "auto":
3418
      mac_address = self.cfg.GenerateMAC()
3419
    else:
3420
      mac_address = self.op.mac
3421

    
3422
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3423
    if self.inst_ip is not None:
3424
      nic.ip = self.inst_ip
3425

    
3426
    ht_kind = self.sstore.GetHypervisorType()
3427
    if ht_kind in constants.HTS_REQ_PORT:
3428
      network_port = self.cfg.AllocatePort()
3429
    else:
3430
      network_port = None
3431

    
3432
    # this is needed because os.path.join does not accept None arguments
3433
    if self.op.file_storage_dir is None:
3434
      string_file_storage_dir = ""
3435
    else:
3436
      string_file_storage_dir = self.op.file_storage_dir
3437

    
3438
    # build the full file storage dir path
3439
    file_storage_dir = os.path.normpath(os.path.join(
3440
                                        self.sstore.GetFileStorageDir(),
3441
                                        string_file_storage_dir, instance))
3442

    
3443

    
3444
    disks = _GenerateDiskTemplate(self.cfg,
3445
                                  self.op.disk_template,
3446
                                  instance, pnode_name,
3447
                                  self.secondaries, self.op.disk_size,
3448
                                  self.op.swap_size,
3449
                                  file_storage_dir,
3450
                                  self.op.file_driver)
3451

    
3452
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3453
                            primary_node=pnode_name,
3454
                            memory=self.op.mem_size,
3455
                            vcpus=self.op.vcpus,
3456
                            nics=[nic], disks=disks,
3457
                            disk_template=self.op.disk_template,
3458
                            status=self.instance_status,
3459
                            network_port=network_port,
3460
                            kernel_path=self.op.kernel_path,
3461
                            initrd_path=self.op.initrd_path,
3462
                            hvm_boot_order=self.op.hvm_boot_order,
3463
                            )
3464

    
3465
    feedback_fn("* creating instance disks...")
3466
    if not _CreateDisks(self.cfg, iobj):
3467
      _RemoveDisks(iobj, self.cfg)
3468
      raise errors.OpExecError("Device creation failed, reverting...")
3469

    
3470
    feedback_fn("adding instance %s to cluster config" % instance)
3471

    
3472
    self.cfg.AddInstance(iobj)
3473

    
3474
    if self.op.wait_for_sync:
3475
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3476
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3477
      # make sure the disks are not degraded (still sync-ing is ok)
3478
      time.sleep(15)
3479
      feedback_fn("* checking mirrors status")
3480
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3481
    else:
3482
      disk_abort = False
3483

    
3484
    if disk_abort:
3485
      _RemoveDisks(iobj, self.cfg)
3486
      self.cfg.RemoveInstance(iobj.name)
3487
      raise errors.OpExecError("There are some degraded disks for"
3488
                               " this instance")
3489

    
3490
    feedback_fn("creating os for instance %s on node %s" %
3491
                (instance, pnode_name))
3492

    
3493
    if iobj.disk_template != constants.DT_DISKLESS:
3494
      if self.op.mode == constants.INSTANCE_CREATE:
3495
        feedback_fn("* running the instance OS create scripts...")
3496
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3497
          raise errors.OpExecError("could not add os for instance %s"
3498
                                   " on node %s" %
3499
                                   (instance, pnode_name))
3500

    
3501
      elif self.op.mode == constants.INSTANCE_IMPORT:
3502
        feedback_fn("* running the instance OS import scripts...")
3503
        src_node = self.op.src_node
3504
        src_image = self.src_image
3505
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3506
                                                src_node, src_image):
3507
          raise errors.OpExecError("Could not import os for instance"
3508
                                   " %s on node %s" %
3509
                                   (instance, pnode_name))
3510
      else:
3511
        # also checked in the prereq part
3512
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3513
                                     % self.op.mode)
3514

    
3515
    if self.op.start:
3516
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3517
      feedback_fn("* starting instance...")
3518
      if not rpc.call_instance_start(pnode_name, iobj, None):
3519
        raise errors.OpExecError("Could not start instance")
3520

    
3521

    
3522
class LUConnectConsole(NoHooksLU):
3523
  """Connect to an instance's console.
3524

3525
  This is somewhat special in that it returns the command line that
3526
  you need to run on the master node in order to connect to the
3527
  console.
3528

3529
  """
3530
  _OP_REQP = ["instance_name"]
3531

    
3532
  def CheckPrereq(self):
3533
    """Check prerequisites.
3534

3535
    This checks that the instance is in the cluster.
3536

3537
    """
3538
    instance = self.cfg.GetInstanceInfo(
3539
      self.cfg.ExpandInstanceName(self.op.instance_name))
3540
    if instance is None:
3541
      raise errors.OpPrereqError("Instance '%s' not known" %
3542
                                 self.op.instance_name)
3543
    self.instance = instance
3544

    
3545
  def Exec(self, feedback_fn):
3546
    """Connect to the console of an instance
3547

3548
    """
3549
    instance = self.instance
3550
    node = instance.primary_node
3551

    
3552
    node_insts = rpc.call_instance_list([node])[node]
3553
    if node_insts is False:
3554
      raise errors.OpExecError("Can't connect to node %s." % node)
3555

    
3556
    if instance.name not in node_insts:
3557
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3558

    
3559
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3560

    
3561
    hyper = hypervisor.GetHypervisor()
3562
    console_cmd = hyper.GetShellCommandForConsole(instance)
3563

    
3564
    # build ssh cmdline
3565
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3566

    
3567

    
3568
class LUReplaceDisks(LogicalUnit):
3569
  """Replace the disks of an instance.
3570

3571
  """
3572
  HPATH = "mirrors-replace"
3573
  HTYPE = constants.HTYPE_INSTANCE
3574
  _OP_REQP = ["instance_name", "mode", "disks"]
3575

    
3576
  def _RunAllocator(self):
3577
    """Compute a new secondary node using an IAllocator.
3578

3579
    """
3580
    ial = IAllocator(self.cfg, self.sstore,
3581
                     mode=constants.IALLOCATOR_MODE_RELOC,
3582
                     name=self.op.instance_name,
3583
                     relocate_from=[self.sec_node])
3584

    
3585
    ial.Run(self.op.iallocator)
3586

    
3587
    if not ial.success:
3588
      raise errors.OpPrereqError("Can't compute nodes using"
3589
                                 " iallocator '%s': %s" % (self.op.iallocator,
3590
                                                           ial.info))
3591
    if len(ial.nodes) != ial.required_nodes:
3592
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3593
                                 " of nodes (%s), required %s" %
3594
                                 (len(ial.nodes), ial.required_nodes))
3595
    self.op.remote_node = ial.nodes[0]
3596
    logger.ToStdout("Selected new secondary for the instance: %s" %
3597
                    self.op.remote_node)
3598

    
3599
  def BuildHooksEnv(self):
3600
    """Build hooks env.
3601

3602
    This runs on the master, the primary and all the secondaries.
3603

3604
    """
3605
    env = {
3606
      "MODE": self.op.mode,
3607
      "NEW_SECONDARY": self.op.remote_node,
3608
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3609
      }
3610
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3611
    nl = [
3612
      self.sstore.GetMasterNode(),
3613
      self.instance.primary_node,
3614
      ]
3615
    if self.op.remote_node is not None:
3616
      nl.append(self.op.remote_node)
3617
    return env, nl, nl
3618

    
3619
  def CheckPrereq(self):
3620
    """Check prerequisites.
3621

3622
    This checks that the instance is in the cluster.
3623

3624
    """
3625
    if not hasattr(self.op, "remote_node"):
3626
      self.op.remote_node = None
3627

    
3628
    instance = self.cfg.GetInstanceInfo(
3629
      self.cfg.ExpandInstanceName(self.op.instance_name))
3630
    if instance is None:
3631
      raise errors.OpPrereqError("Instance '%s' not known" %
3632
                                 self.op.instance_name)
3633
    self.instance = instance
3634
    self.op.instance_name = instance.name
3635

    
3636
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3637
      raise errors.OpPrereqError("Instance's disk layout is not"
3638
                                 " network mirrored.")
3639

    
3640
    if len(instance.secondary_nodes) != 1:
3641
      raise errors.OpPrereqError("The instance has a strange layout,"
3642
                                 " expected one secondary but found %d" %
3643
                                 len(instance.secondary_nodes))
3644

    
3645
    self.sec_node = instance.secondary_nodes[0]
3646

    
3647
    ia_name = getattr(self.op, "iallocator", None)
3648
    if ia_name is not None:
3649
      if self.op.remote_node is not None:
3650
        raise errors.OpPrereqError("Give either the iallocator or the new"
3651
                                   " secondary, not both")
3652
      self.op.remote_node = self._RunAllocator()
3653

    
3654
    remote_node = self.op.remote_node
3655
    if remote_node is not None:
3656
      remote_node = self.cfg.ExpandNodeName(remote_node)
3657
      if remote_node is None:
3658
        raise errors.OpPrereqError("Node '%s' not known" %
3659
                                   self.op.remote_node)
3660
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3661
    else:
3662
      self.remote_node_info = None
3663
    if remote_node == instance.primary_node:
3664
      raise errors.OpPrereqError("The specified node is the primary node of"
3665
                                 " the instance.")
3666
    elif remote_node == self.sec_node:
3667
      if self.op.mode == constants.REPLACE_DISK_SEC:
3668
        # this is for DRBD8, where we can't execute the same mode of
3669
        # replacement as for drbd7 (no different port allocated)
3670
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3671
                                   " replacement")
3672
      # the user gave the current secondary, switch to
3673
      # 'no-replace-secondary' mode for drbd7
3674
      remote_node = None
3675
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3676
        self.op.mode != constants.REPLACE_DISK_ALL):
3677
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3678
                                 " disks replacement, not individual ones")
3679
    if instance.disk_template == constants.DT_DRBD8:
3680
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3681
          remote_node is not None):
3682
        # switch to replace secondary mode
3683
        self.op.mode = constants.REPLACE_DISK_SEC
3684

    
3685
      if self.op.mode == constants.REPLACE_DISK_ALL:
3686
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3687
                                   " secondary disk replacement, not"
3688
                                   " both at once")
3689
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3690
        if remote_node is not None:
3691
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3692
                                     " the secondary while doing a primary"
3693
                                     " node disk replacement")
3694
        self.tgt_node = instance.primary_node
3695
        self.oth_node = instance.secondary_nodes[0]
3696
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3697
        self.new_node = remote_node # this can be None, in which case
3698
                                    # we don't change the secondary
3699
        self.tgt_node = instance.secondary_nodes[0]
3700
        self.oth_node = instance.primary_node
3701
      else:
3702
        raise errors.ProgrammerError("Unhandled disk replace mode")
3703

    
3704
    for name in self.op.disks:
3705
      if instance.FindDisk(name) is None:
3706
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3707
                                   (name, instance.name))
3708
    self.op.remote_node = remote_node
3709

    
3710
  def _ExecRR1(self, feedback_fn):
3711
    """Replace the disks of an instance.
3712

3713
    """
3714
    instance = self.instance
3715
    iv_names = {}
3716
    # start of work
3717
    if self.op.remote_node is None:
3718
      remote_node = self.sec_node
3719
    else:
3720
      remote_node = self.op.remote_node
3721
    cfg = self.cfg
3722
    for dev in instance.disks:
3723
      size = dev.size
3724
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3725
      names = _GenerateUniqueNames(cfg, lv_names)
3726
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3727
                                       remote_node, size, names)
3728
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3729
      logger.Info("adding new mirror component on secondary for %s" %
3730
                  dev.iv_name)
3731
      #HARDCODE
3732
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3733
                                        new_drbd, False,
3734
                                        _GetInstanceInfoText(instance)):
3735
        raise errors.OpExecError("Failed to create new component on secondary"
3736
                                 " node %s. Full abort, cleanup manually!" %
3737
                                 remote_node)
3738

    
3739
      logger.Info("adding new mirror component on primary")
3740
      #HARDCODE
3741
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3742
                                      instance, new_drbd,
3743
                                      _GetInstanceInfoText(instance)):
3744
        # remove secondary dev
3745
        cfg.SetDiskID(new_drbd, remote_node)
3746
        rpc.call_blockdev_remove(remote_node, new_drbd)
3747
        raise errors.OpExecError("Failed to create volume on primary!"
3748
                                 " Full abort, cleanup manually!!")
3749

    
3750
      # the device exists now
3751
      # call the primary node to add the mirror to md
3752
      logger.Info("adding new mirror component to md")
3753
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3754
                                           [new_drbd]):
3755
        logger.Error("Can't add mirror compoment to md!")
3756
        cfg.SetDiskID(new_drbd, remote_node)
3757
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3758
          logger.Error("Can't rollback on secondary")
3759
        cfg.SetDiskID(new_drbd, instance.primary_node)
3760
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3761
          logger.Error("Can't rollback on primary")
3762
        raise errors.OpExecError("Full abort, cleanup manually!!")
3763

    
3764
      dev.children.append(new_drbd)
3765
      cfg.AddInstance(instance)
3766

    
3767
    # this can fail as the old devices are degraded and _WaitForSync
3768
    # does a combined result over all disks, so we don't check its
3769
    # return value
3770
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3771

    
3772
    # so check manually all the devices
3773
    for name in iv_names:
3774
      dev, child, new_drbd = iv_names[name]
3775
      cfg.SetDiskID(dev, instance.primary_node)
3776
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3777
      if is_degr:
3778
        raise errors.OpExecError("MD device %s is degraded!" % name)
3779
      cfg.SetDiskID(new_drbd, instance.primary_node)
3780
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3781
      if is_degr:
3782
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3783

    
3784
    for name in iv_names:
3785
      dev, child, new_drbd = iv_names[name]
3786
      logger.Info("remove mirror %s component" % name)
3787
      cfg.SetDiskID(dev, instance.primary_node)
3788
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3789
                                              dev, [child]):
3790
        logger.Error("Can't remove child from mirror, aborting"
3791
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3792
        continue
3793

    
3794
      for node in child.logical_id[:2]:
3795
        logger.Info("remove child device on %s" % node)
3796
        cfg.SetDiskID(child, node)
3797
        if not rpc.call_blockdev_remove(node, child):
3798
          logger.Error("Warning: failed to remove device from node %s,"
3799
                       " continuing operation." % node)
3800

    
3801
      dev.children.remove(child)
3802

    
3803
      cfg.AddInstance(instance)
3804

    
3805
  def _ExecD8DiskOnly(self, feedback_fn):
3806
    """Replace a disk on the primary or secondary for dbrd8.
3807

3808
    The algorithm for replace is quite complicated:
3809
      - for each disk to be replaced:
3810
        - create new LVs on the target node with unique names
3811
        - detach old LVs from the drbd device
3812
        - rename old LVs to name_replaced.<time_t>
3813
        - rename new LVs to old LVs
3814
        - attach the new LVs (with the old names now) to the drbd device
3815
      - wait for sync across all devices
3816
      - for each modified disk:
3817
        - remove old LVs (which have the name name_replaces.<time_t>)
3818

3819
    Failures are not very well handled.
3820

3821
    """
3822
    steps_total = 6
3823
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3824
    instance = self.instance
3825
    iv_names = {}
3826
    vgname = self.cfg.GetVGName()
3827
    # start of work
3828
    cfg = self.cfg
3829
    tgt_node = self.tgt_node
3830
    oth_node = self.oth_node
3831

    
3832
    # Step: check device activation
3833
    self.proc.LogStep(1, steps_total, "check device existence")
3834
    info("checking volume groups")
3835
    my_vg = cfg.GetVGName()
3836
    results = rpc.call_vg_list([oth_node, tgt_node])
3837
    if not results:
3838
      raise errors.OpExecError("Can't list volume groups on the nodes")
3839
    for node in oth_node, tgt_node:
3840
      res = results.get(node, False)
3841
      if not res or my_vg not in res:
3842
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3843
                                 (my_vg, node))
3844
    for dev in instance.disks:
3845
      if not dev.iv_name in self.op.disks:
3846
        continue
3847
      for node in tgt_node, oth_node:
3848
        info("checking %s on %s" % (dev.iv_name, node))
3849
        cfg.SetDiskID(dev, node)
3850
        if not rpc.call_blockdev_find(node, dev):
3851
          raise errors.OpExecError("Can't find device %s on node %s" %
3852
                                   (dev.iv_name, node))
3853

    
3854
    # Step: check other node consistency
3855
    self.proc.LogStep(2, steps_total, "check peer consistency")
3856
    for dev in instance.disks:
3857
      if not dev.iv_name in self.op.disks:
3858
        continue
3859
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3860
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3861
                                   oth_node==instance.primary_node):
3862
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3863
                                 " to replace disks on this node (%s)" %
3864
                                 (oth_node, tgt_node))
3865

    
3866
    # Step: create new storage
3867
    self.proc.LogStep(3, steps_total, "allocate new storage")
3868
    for dev in instance.disks:
3869
      if not dev.iv_name in self.op.disks:
3870
        continue
3871
      size = dev.size
3872
      cfg.SetDiskID(dev, tgt_node)
3873
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3874
      names = _GenerateUniqueNames(cfg, lv_names)
3875
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3876
                             logical_id=(vgname, names[0]))
3877
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3878
                             logical_id=(vgname, names[1]))
3879
      new_lvs = [lv_data, lv_meta]
3880
      old_lvs = dev.children
3881
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3882
      info("creating new local storage on %s for %s" %
3883
           (tgt_node, dev.iv_name))
3884
      # since we *always* want to create this LV, we use the
3885
      # _Create...OnPrimary (which forces the creation), even if we
3886
      # are talking about the secondary node
3887
      for new_lv in new_lvs:
3888
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3889
                                        _GetInstanceInfoText(instance)):
3890
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3891
                                   " node '%s'" %
3892
                                   (new_lv.logical_id[1], tgt_node))
3893

    
3894
    # Step: for each lv, detach+rename*2+attach
3895
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3896
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3897
      info("detaching %s drbd from local storage" % dev.iv_name)
3898
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3899
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3900
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3901
      #dev.children = []
3902
      #cfg.Update(instance)
3903

    
3904
      # ok, we created the new LVs, so now we know we have the needed
3905
      # storage; as such, we proceed on the target node to rename
3906
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3907
      # using the assumption that logical_id == physical_id (which in
3908
      # turn is the unique_id on that node)
3909

    
3910
      # FIXME(iustin): use a better name for the replaced LVs
3911
      temp_suffix = int(time.time())
3912
      ren_fn = lambda d, suff: (d.physical_id[0],
3913
                                d.physical_id[1] + "_replaced-%s" % suff)
3914
      # build the rename list based on what LVs exist on the node
3915
      rlist = []
3916
      for to_ren in old_lvs:
3917
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3918
        if find_res is not None: # device exists
3919
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3920

    
3921
      info("renaming the old LVs on the target node")
3922
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3923
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3924
      # now we rename the new LVs to the old LVs
3925
      info("renaming the new LVs on the target node")
3926
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3927
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3928
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3929

    
3930
      for old, new in zip(old_lvs, new_lvs):
3931
        new.logical_id = old.logical_id
3932
        cfg.SetDiskID(new, tgt_node)
3933

    
3934
      for disk in old_lvs:
3935
        disk.logical_id = ren_fn(disk, temp_suffix)
3936
        cfg.SetDiskID(disk, tgt_node)
3937

    
3938
      # now that the new lvs have the old name, we can add them to the device
3939
      info("adding new mirror component on %s" % tgt_node)
3940
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3941
        for new_lv in new_lvs:
3942
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3943
            warning("Can't rollback device %s", hint="manually cleanup unused"
3944
                    " logical volumes")
3945
        raise errors.OpExecError("Can't add local storage to drbd")
3946

    
3947
      dev.children = new_lvs
3948
      cfg.Update(instance)
3949

    
3950
    # Step: wait for sync
3951

    
3952
    # this can fail as the old devices are degraded and _WaitForSync
3953
    # does a combined result over all disks, so we don't check its
3954
    # return value
3955
    self.proc.LogStep(5, steps_total, "sync devices")
3956
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3957

    
3958
    # so check manually all the devices
3959
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3960
      cfg.SetDiskID(dev, instance.primary_node)
3961
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3962
      if is_degr:
3963
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3964

    
3965
    # Step: remove old storage
3966
    self.proc.LogStep(6, steps_total, "removing old storage")
3967
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3968
      info("remove logical volumes for %s" % name)
3969
      for lv in old_lvs:
3970
        cfg.SetDiskID(lv, tgt_node)
3971
        if not rpc.call_blockdev_remove(tgt_node, lv):
3972
          warning("Can't remove old LV", hint="manually remove unused LVs")
3973
          continue