Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9ac99fda

History | View | Annotate | Download (171.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    As for the node lists, the master should not be included in the
149
    them, as it will be added by the hooks runner in case this LU
150
    requires a cluster to run on (otherwise we don't have a node
151
    list). No nodes should be returned as an empty list (and not
152
    None).
153

154
    Note that if the HPATH for a LU class is None, this function will
155
    not be called.
156

157
    """
158
    raise NotImplementedError
159

    
160

    
161
class NoHooksLU(LogicalUnit):
162
  """Simple LU which runs no hooks.
163

164
  This LU is intended as a parent for other LogicalUnits which will
165
  run no hooks, in order to reduce duplicate code.
166

167
  """
168
  HPATH = None
169
  HTYPE = None
170

    
171
  def BuildHooksEnv(self):
172
    """Build hooks env.
173

174
    This is a no-op, since we don't run hooks.
175

176
    """
177
    return {}, [], []
178

    
179

    
180
def _AddHostToEtcHosts(hostname):
181
  """Wrapper around utils.SetEtcHostsEntry.
182

183
  """
184
  hi = utils.HostInfo(name=hostname)
185
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
186

    
187

    
188
def _RemoveHostFromEtcHosts(hostname):
189
  """Wrapper around utils.RemoveEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
194
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
195

    
196

    
197
def _GetWantedNodes(lu, nodes):
198
  """Returns list of checked and expanded node names.
199

200
  Args:
201
    nodes: List of nodes (strings) or None for all
202

203
  """
204
  if not isinstance(nodes, list):
205
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
206

    
207
  if nodes:
208
    wanted = []
209

    
210
    for name in nodes:
211
      node = lu.cfg.ExpandNodeName(name)
212
      if node is None:
213
        raise errors.OpPrereqError("No such node name '%s'" % name)
214
      wanted.append(node)
215

    
216
  else:
217
    wanted = lu.cfg.GetNodeList()
218
  return utils.NiceSort(wanted)
219

    
220

    
221
def _GetWantedInstances(lu, instances):
222
  """Returns list of checked and expanded instance names.
223

224
  Args:
225
    instances: List of instances (strings) or None for all
226

227
  """
228
  if not isinstance(instances, list):
229
    raise errors.OpPrereqError("Invalid argument type 'instances'")
230

    
231
  if instances:
232
    wanted = []
233

    
234
    for name in instances:
235
      instance = lu.cfg.ExpandInstanceName(name)
236
      if instance is None:
237
        raise errors.OpPrereqError("No such instance name '%s'" % name)
238
      wanted.append(instance)
239

    
240
  else:
241
    wanted = lu.cfg.GetInstanceList()
242
  return utils.NiceSort(wanted)
243

    
244

    
245
def _CheckOutputFields(static, dynamic, selected):
246
  """Checks whether all selected fields are valid.
247

248
  Args:
249
    static: Static fields
250
    dynamic: Dynamic fields
251

252
  """
253
  static_fields = frozenset(static)
254
  dynamic_fields = frozenset(dynamic)
255

    
256
  all_fields = static_fields | dynamic_fields
257

    
258
  if not all_fields.issuperset(selected):
259
    raise errors.OpPrereqError("Unknown output fields selected: %s"
260
                               % ",".join(frozenset(selected).
261
                                          difference(all_fields)))
262

    
263

    
264
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
265
                          memory, vcpus, nics):
266
  """Builds instance related env variables for hooks from single variables.
267

268
  Args:
269
    secondary_nodes: List of secondary nodes as strings
270
  """
271
  env = {
272
    "OP_TARGET": name,
273
    "INSTANCE_NAME": name,
274
    "INSTANCE_PRIMARY": primary_node,
275
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
276
    "INSTANCE_OS_TYPE": os_type,
277
    "INSTANCE_STATUS": status,
278
    "INSTANCE_MEMORY": memory,
279
    "INSTANCE_VCPUS": vcpus,
280
  }
281

    
282
  if nics:
283
    nic_count = len(nics)
284
    for idx, (ip, bridge, mac) in enumerate(nics):
285
      if ip is None:
286
        ip = ""
287
      env["INSTANCE_NIC%d_IP" % idx] = ip
288
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
289
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
290
  else:
291
    nic_count = 0
292

    
293
  env["INSTANCE_NIC_COUNT"] = nic_count
294

    
295
  return env
296

    
297

    
298
def _BuildInstanceHookEnvByObject(instance, override=None):
299
  """Builds instance related env variables for hooks from an object.
300

301
  Args:
302
    instance: objects.Instance object of instance
303
    override: dict of values to override
304
  """
305
  args = {
306
    'name': instance.name,
307
    'primary_node': instance.primary_node,
308
    'secondary_nodes': instance.secondary_nodes,
309
    'os_type': instance.os,
310
    'status': instance.os,
311
    'memory': instance.memory,
312
    'vcpus': instance.vcpus,
313
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
314
  }
315
  if override:
316
    args.update(override)
317
  return _BuildInstanceHookEnv(**args)
318

    
319

    
320
def _HasValidVG(vglist, vgname):
321
  """Checks if the volume group list is valid.
322

323
  A non-None return value means there's an error, and the return value
324
  is the error message.
325

326
  """
327
  vgsize = vglist.get(vgname, None)
328
  if vgsize is None:
329
    return "volume group '%s' missing" % vgname
330
  elif vgsize < 20480:
331
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
332
            (vgname, vgsize))
333
  return None
334

    
335

    
336
def _InitSSHSetup(node):
337
  """Setup the SSH configuration for the cluster.
338

339

340
  This generates a dsa keypair for root, adds the pub key to the
341
  permitted hosts and adds the hostkey to its own known hosts.
342

343
  Args:
344
    node: the name of this host as a fqdn
345

346
  """
347
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348

    
349
  for name in priv_key, pub_key:
350
    if os.path.exists(name):
351
      utils.CreateBackup(name)
352
    utils.RemoveFile(name)
353

    
354
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
355
                         "-f", priv_key,
356
                         "-q", "-N", ""])
357
  if result.failed:
358
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
359
                             result.output)
360

    
361
  f = open(pub_key, 'r')
362
  try:
363
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
364
  finally:
365
    f.close()
366

    
367

    
368
def _InitGanetiServerSetup(ss):
369
  """Setup the necessary configuration for the initial node daemon.
370

371
  This creates the nodepass file containing the shared password for
372
  the cluster and also generates the SSL certificate.
373

374
  """
375
  # Create pseudo random password
376
  randpass = sha.new(os.urandom(64)).hexdigest()
377
  # and write it into sstore
378
  ss.SetKey(ss.SS_NODED_PASS, randpass)
379

    
380
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
381
                         "-days", str(365*5), "-nodes", "-x509",
382
                         "-keyout", constants.SSL_CERT_FILE,
383
                         "-out", constants.SSL_CERT_FILE, "-batch"])
384
  if result.failed:
385
    raise errors.OpExecError("could not generate server ssl cert, command"
386
                             " %s had exitcode %s and error message %s" %
387
                             (result.cmd, result.exit_code, result.output))
388

    
389
  os.chmod(constants.SSL_CERT_FILE, 0400)
390

    
391
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
392

    
393
  if result.failed:
394
    raise errors.OpExecError("Could not start the node daemon, command %s"
395
                             " had exitcode %s and error %s" %
396
                             (result.cmd, result.exit_code, result.output))
397

    
398

    
399
def _CheckInstanceBridgesExist(instance):
400
  """Check that the brigdes needed by an instance exist.
401

402
  """
403
  # check bridges existance
404
  brlist = [nic.bridge for nic in instance.nics]
405
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
406
    raise errors.OpPrereqError("one or more target bridges %s does not"
407
                               " exist on destination node '%s'" %
408
                               (brlist, instance.primary_node))
409

    
410

    
411
class LUInitCluster(LogicalUnit):
412
  """Initialise the cluster.
413

414
  """
415
  HPATH = "cluster-init"
416
  HTYPE = constants.HTYPE_CLUSTER
417
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
418
              "def_bridge", "master_netdev", "file_storage_dir"]
419
  REQ_CLUSTER = False
420

    
421
  def BuildHooksEnv(self):
422
    """Build hooks env.
423

424
    Notes: Since we don't require a cluster, we must manually add
425
    ourselves in the post-run node list.
426

427
    """
428
    env = {"OP_TARGET": self.op.cluster_name}
429
    return env, [], [self.hostname.name]
430

    
431
  def CheckPrereq(self):
432
    """Verify that the passed name is a valid one.
433

434
    """
435
    if config.ConfigWriter.IsCluster():
436
      raise errors.OpPrereqError("Cluster is already initialised")
437

    
438
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
439
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
440
        raise errors.OpPrereqError("Please prepare the cluster VNC"
441
                                   "password file %s" %
442
                                   constants.VNC_PASSWORD_FILE)
443

    
444
    self.hostname = hostname = utils.HostInfo()
445

    
446
    if hostname.ip.startswith("127."):
447
      raise errors.OpPrereqError("This host's IP resolves to the private"
448
                                 " range (%s). Please fix DNS or %s." %
449
                                 (hostname.ip, constants.ETC_HOSTS))
450

    
451
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
452
                         source=constants.LOCALHOST_IP_ADDRESS):
453
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
454
                                 " to %s,\nbut this ip address does not"
455
                                 " belong to this host."
456
                                 " Aborting." % hostname.ip)
457

    
458
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459

    
460
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461
                     timeout=5):
462
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463

    
464
    secondary_ip = getattr(self.op, "secondary_ip", None)
465
    if secondary_ip and not utils.IsValidIP(secondary_ip):
466
      raise errors.OpPrereqError("Invalid secondary ip given")
467
    if (secondary_ip and
468
        secondary_ip != hostname.ip and
469
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
470
                           source=constants.LOCALHOST_IP_ADDRESS))):
471
      raise errors.OpPrereqError("You gave %s as secondary IP,"
472
                                 " but it does not belong to this host." %
473
                                 secondary_ip)
474
    self.secondary_ip = secondary_ip
475

    
476
    if not hasattr(self.op, "vg_name"):
477
      self.op.vg_name = None
478
    # if vg_name not None, checks if volume group is valid
479
    if self.op.vg_name:
480
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
481
      if vgstatus:
482
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
483
                                   " you are not using lvm" % vgstatus)
484

    
485
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
486

    
487
    if not os.path.isabs(self.op.file_storage_dir):
488
      raise errors.OpPrereqError("The file storage directory you have is"
489
                                 " not an absolute path.")
490

    
491
    if not os.path.exists(self.op.file_storage_dir):
492
      try:
493
        os.makedirs(self.op.file_storage_dir, 0750)
494
      except OSError, err:
495
        raise errors.OpPrereqError("Cannot create file storage directory"
496
                                   " '%s': %s" %
497
                                   (self.op.file_storage_dir, err))
498

    
499
    if not os.path.isdir(self.op.file_storage_dir):
500
      raise errors.OpPrereqError("The file storage directory '%s' is not"
501
                                 " a directory." % self.op.file_storage_dir)
502

    
503
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504
                    self.op.mac_prefix):
505
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
506
                                 self.op.mac_prefix)
507

    
508
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
509
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
510
                                 self.op.hypervisor_type)
511

    
512
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513
    if result.failed:
514
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
515
                                 (self.op.master_netdev,
516
                                  result.output.strip()))
517

    
518
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
519
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
520
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
521
                                 " executable." % constants.NODE_INITD_SCRIPT)
522

    
523
  def Exec(self, feedback_fn):
524
    """Initialize the cluster.
525

526
    """
527
    clustername = self.clustername
528
    hostname = self.hostname
529

    
530
    # set up the simple store
531
    self.sstore = ss = ssconf.SimpleStore()
532
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
533
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
534
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
535
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
536
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
537
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
538

    
539
    # set up the inter-node password and certificate
540
    _InitGanetiServerSetup(ss)
541

    
542
    # start the master ip
543
    rpc.call_node_start_master(hostname.name)
544

    
545
    # set up ssh config and /etc/hosts
546
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
547
    try:
548
      sshline = f.read()
549
    finally:
550
      f.close()
551
    sshkey = sshline.split(" ")[1]
552

    
553
    _AddHostToEtcHosts(hostname.name)
554
    _InitSSHSetup(hostname.name)
555

    
556
    # init of cluster config file
557
    self.cfg = cfgw = config.ConfigWriter()
558
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
559
                    sshkey, self.op.mac_prefix,
560
                    self.op.vg_name, self.op.def_bridge)
561

    
562
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
563

    
564

    
565
class LUDestroyCluster(NoHooksLU):
566
  """Logical unit for destroying the cluster.
567

568
  """
569
  _OP_REQP = []
570

    
571
  def CheckPrereq(self):
572
    """Check prerequisites.
573

574
    This checks whether the cluster is empty.
575

576
    Any errors are signalled by raising errors.OpPrereqError.
577

578
    """
579
    master = self.sstore.GetMasterNode()
580

    
581
    nodelist = self.cfg.GetNodeList()
582
    if len(nodelist) != 1 or nodelist[0] != master:
583
      raise errors.OpPrereqError("There are still %d node(s) in"
584
                                 " this cluster." % (len(nodelist) - 1))
585
    instancelist = self.cfg.GetInstanceList()
586
    if instancelist:
587
      raise errors.OpPrereqError("There are still %d instance(s) in"
588
                                 " this cluster." % len(instancelist))
589

    
590
  def Exec(self, feedback_fn):
591
    """Destroys the cluster.
592

593
    """
594
    master = self.sstore.GetMasterNode()
595
    if not rpc.call_node_stop_master(master):
596
      raise errors.OpExecError("Could not disable the master role")
597
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598
    utils.CreateBackup(priv_key)
599
    utils.CreateBackup(pub_key)
600
    rpc.call_node_leave_cluster(master)
601

    
602

    
603
class LUVerifyCluster(NoHooksLU):
604
  """Verifies the cluster status.
605

606
  """
607
  _OP_REQP = ["skip_checks"]
608

    
609
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
610
                  remote_version, feedback_fn):
611
    """Run multiple tests against a node.
612

613
    Test list:
614
      - compares ganeti version
615
      - checks vg existance and size > 20G
616
      - checks config file checksum
617
      - checks ssh to other nodes
618

619
    Args:
620
      node: name of the node to check
621
      file_list: required list of files
622
      local_cksum: dictionary of local files and their checksums
623

624
    """
625
    # compares ganeti version
626
    local_version = constants.PROTOCOL_VERSION
627
    if not remote_version:
628
      feedback_fn("  - ERROR: connection to %s failed" % (node))
629
      return True
630

    
631
    if local_version != remote_version:
632
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
633
                      (local_version, node, remote_version))
634
      return True
635

    
636
    # checks vg existance and size > 20G
637

    
638
    bad = False
639
    if not vglist:
640
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
641
                      (node,))
642
      bad = True
643
    else:
644
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
645
      if vgstatus:
646
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
647
        bad = True
648

    
649
    # checks config file checksum
650
    # checks ssh to any
651

    
652
    if 'filelist' not in node_result:
653
      bad = True
654
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
655
    else:
656
      remote_cksum = node_result['filelist']
657
      for file_name in file_list:
658
        if file_name not in remote_cksum:
659
          bad = True
660
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
661
        elif remote_cksum[file_name] != local_cksum[file_name]:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
664

    
665
    if 'nodelist' not in node_result:
666
      bad = True
667
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
668
    else:
669
      if node_result['nodelist']:
670
        bad = True
671
        for node in node_result['nodelist']:
672
          feedback_fn("  - ERROR: communication with node '%s': %s" %
673
                          (node, node_result['nodelist'][node]))
674
    hyp_result = node_result.get('hypervisor', None)
675
    if hyp_result is not None:
676
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
677
    return bad
678

    
679
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
680
                      node_instance, feedback_fn):
681
    """Verify an instance.
682

683
    This function checks to see if the required block devices are
684
    available on the instance's node.
685

686
    """
687
    bad = False
688

    
689
    node_current = instanceconfig.primary_node
690

    
691
    node_vol_should = {}
692
    instanceconfig.MapLVsByNode(node_vol_should)
693

    
694
    for node in node_vol_should:
695
      for volume in node_vol_should[node]:
696
        if node not in node_vol_is or volume not in node_vol_is[node]:
697
          feedback_fn("  - ERROR: volume %s missing on node %s" %
698
                          (volume, node))
699
          bad = True
700

    
701
    if not instanceconfig.status == 'down':
702
      if (node_current not in node_instance or
703
          not instance in node_instance[node_current]):
704
        feedback_fn("  - ERROR: instance %s not running on node %s" %
705
                        (instance, node_current))
706
        bad = True
707

    
708
    for node in node_instance:
709
      if (not node == node_current):
710
        if instance in node_instance[node]:
711
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
712
                          (instance, node))
713
          bad = True
714

    
715
    return bad
716

    
717
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
718
    """Verify if there are any unknown volumes in the cluster.
719

720
    The .os, .swap and backup volumes are ignored. All other volumes are
721
    reported as unknown.
722

723
    """
724
    bad = False
725

    
726
    for node in node_vol_is:
727
      for volume in node_vol_is[node]:
728
        if node not in node_vol_should or volume not in node_vol_should[node]:
729
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
730
                      (volume, node))
731
          bad = True
732
    return bad
733

    
734
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
735
    """Verify the list of running instances.
736

737
    This checks what instances are running but unknown to the cluster.
738

739
    """
740
    bad = False
741
    for node in node_instance:
742
      for runninginstance in node_instance[node]:
743
        if runninginstance not in instancelist:
744
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
745
                          (runninginstance, node))
746
          bad = True
747
    return bad
748

    
749
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
750
    """Verify N+1 Memory Resilience.
751

752
    Check that if one single node dies we can still start all the instances it
753
    was primary for.
754

755
    """
756
    bad = False
757

    
758
    for node, nodeinfo in node_info.iteritems():
759
      # This code checks that every node which is now listed as secondary has
760
      # enough memory to host all instances it is supposed to should a single
761
      # other node in the cluster fail.
762
      # FIXME: not ready for failover to an arbitrary node
763
      # FIXME: does not support file-backed instances
764
      # WARNING: we currently take into account down instances as well as up
765
      # ones, considering that even if they're down someone might want to start
766
      # them even in the event of a node failure.
767
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
768
        needed_mem = 0
769
        for instance in instances:
770
          needed_mem += instance_cfg[instance].memory
771
        if nodeinfo['mfree'] < needed_mem:
772
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
773
                      " failovers should node %s fail" % (node, prinode))
774
          bad = True
775
    return bad
776

    
777
  def CheckPrereq(self):
778
    """Check prerequisites.
779

780
    Transform the list of checks we're going to skip into a set and check that
781
    all its members are valid.
782

783
    """
784
    self.skip_set = frozenset(self.op.skip_checks)
785
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
786
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
787

    
788
  def Exec(self, feedback_fn):
789
    """Verify integrity of cluster, performing various test on nodes.
790

791
    """
792
    bad = False
793
    feedback_fn("* Verifying global settings")
794
    for msg in self.cfg.VerifyConfig():
795
      feedback_fn("  - ERROR: %s" % msg)
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
799
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
800
    i_non_redundant = [] # Non redundant instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = list(self.sstore.GetFileList())
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = rpc.call_instance_list(nodelist)
816
    all_vglist = rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': None,
821
      }
822
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
823
    all_rversion = rpc.call_version(nodelist)
824
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
825

    
826
    for node in nodelist:
827
      feedback_fn("* Verifying node %s" % node)
828
      result = self._VerifyNode(node, file_names, local_checksums,
829
                                all_vglist[node], all_nvinfo[node],
830
                                all_rversion[node], feedback_fn)
831
      bad = bad or result
832

    
833
      # node_volume
834
      volumeinfo = all_volumeinfo[node]
835

    
836
      if isinstance(volumeinfo, basestring):
837
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
838
                    (node, volumeinfo[-400:].encode('string_escape')))
839
        bad = True
840
        node_volume[node] = {}
841
      elif not isinstance(volumeinfo, dict):
842
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
843
        bad = True
844
        continue
845
      else:
846
        node_volume[node] = volumeinfo
847

    
848
      # node_instance
849
      nodeinstance = all_instanceinfo[node]
850
      if type(nodeinstance) != list:
851
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
852
        bad = True
853
        continue
854

    
855
      node_instance[node] = nodeinstance
856

    
857
      # node_info
858
      nodeinfo = all_ninfo[node]
859
      if not isinstance(nodeinfo, dict):
860
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
861
        bad = True
862
        continue
863

    
864
      try:
865
        node_info[node] = {
866
          "mfree": int(nodeinfo['memory_free']),
867
          "dfree": int(nodeinfo['vg_free']),
868
          "pinst": [],
869
          "sinst": [],
870
          # dictionary holding all instances this node is secondary for,
871
          # grouped by their primary node. Each key is a cluster node, and each
872
          # value is a list of instances which have the key as primary and the
873
          # current node as secondary.  this is handy to calculate N+1 memory
874
          # availability if you can only failover from a primary to its
875
          # secondary.
876
          "sinst-by-pnode": {},
877
        }
878
      except ValueError:
879
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
880
        bad = True
881
        continue
882

    
883
    node_vol_should = {}
884

    
885
    for instance in instancelist:
886
      feedback_fn("* Verifying instance %s" % instance)
887
      inst_config = self.cfg.GetInstanceInfo(instance)
888
      result =  self._VerifyInstance(instance, inst_config, node_volume,
889
                                     node_instance, feedback_fn)
890
      bad = bad or result
891

    
892
      inst_config.MapLVsByNode(node_vol_should)
893

    
894
      instance_cfg[instance] = inst_config
895

    
896
      pnode = inst_config.primary_node
897
      if pnode in node_info:
898
        node_info[pnode]['pinst'].append(instance)
899
      else:
900
        feedback_fn("  - ERROR: instance %s, connection to primary node"
901
                    " %s failed" % (instance, pnode))
902
        bad = True
903

    
904
      # If the instance is non-redundant we cannot survive losing its primary
905
      # node, so we are not N+1 compliant. On the other hand we have no disk
906
      # templates with more than one secondary so that situation is not well
907
      # supported either.
908
      # FIXME: does not support file-backed instances
909
      if len(inst_config.secondary_nodes) == 0:
910
        i_non_redundant.append(instance)
911
      elif len(inst_config.secondary_nodes) > 1:
912
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
913
                    % instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    return int(bad)
946

    
947

    
948
class LUVerifyDisks(NoHooksLU):
949
  """Verifies the cluster disks status.
950

951
  """
952
  _OP_REQP = []
953

    
954
  def CheckPrereq(self):
955
    """Check prerequisites.
956

957
    This has no prerequisites.
958

959
    """
960
    pass
961

    
962
  def Exec(self, feedback_fn):
963
    """Verify integrity of cluster disks.
964

965
    """
966
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
967

    
968
    vg_name = self.cfg.GetVGName()
969
    nodes = utils.NiceSort(self.cfg.GetNodeList())
970
    instances = [self.cfg.GetInstanceInfo(name)
971
                 for name in self.cfg.GetInstanceList()]
972

    
973
    nv_dict = {}
974
    for inst in instances:
975
      inst_lvs = {}
976
      if (inst.status != "up" or
977
          inst.disk_template not in constants.DTS_NET_MIRROR):
978
        continue
979
      inst.MapLVsByNode(inst_lvs)
980
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
981
      for node, vol_list in inst_lvs.iteritems():
982
        for vol in vol_list:
983
          nv_dict[(node, vol)] = inst
984

    
985
    if not nv_dict:
986
      return result
987

    
988
    node_lvs = rpc.call_volume_list(nodes, vg_name)
989

    
990
    to_act = set()
991
    for node in nodes:
992
      # node_volume
993
      lvs = node_lvs[node]
994

    
995
      if isinstance(lvs, basestring):
996
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
997
        res_nlvm[node] = lvs
998
      elif not isinstance(lvs, dict):
999
        logger.Info("connection to node %s failed or invalid data returned" %
1000
                    (node,))
1001
        res_nodes.append(node)
1002
        continue
1003

    
1004
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1005
        inst = nv_dict.pop((node, lv_name), None)
1006
        if (not lv_online and inst is not None
1007
            and inst.name not in res_instances):
1008
          res_instances.append(inst.name)
1009

    
1010
    # any leftover items in nv_dict are missing LVs, let's arrange the
1011
    # data better
1012
    for key, inst in nv_dict.iteritems():
1013
      if inst.name not in res_missing:
1014
        res_missing[inst.name] = []
1015
      res_missing[inst.name].append(key)
1016

    
1017
    return result
1018

    
1019

    
1020
class LURenameCluster(LogicalUnit):
1021
  """Rename the cluster.
1022

1023
  """
1024
  HPATH = "cluster-rename"
1025
  HTYPE = constants.HTYPE_CLUSTER
1026
  _OP_REQP = ["name"]
1027

    
1028
  def BuildHooksEnv(self):
1029
    """Build hooks env.
1030

1031
    """
1032
    env = {
1033
      "OP_TARGET": self.sstore.GetClusterName(),
1034
      "NEW_NAME": self.op.name,
1035
      }
1036
    mn = self.sstore.GetMasterNode()
1037
    return env, [mn], [mn]
1038

    
1039
  def CheckPrereq(self):
1040
    """Verify that the passed name is a valid one.
1041

1042
    """
1043
    hostname = utils.HostInfo(self.op.name)
1044

    
1045
    new_name = hostname.name
1046
    self.ip = new_ip = hostname.ip
1047
    old_name = self.sstore.GetClusterName()
1048
    old_ip = self.sstore.GetMasterIP()
1049
    if new_name == old_name and new_ip == old_ip:
1050
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1051
                                 " cluster has changed")
1052
    if new_ip != old_ip:
1053
      result = utils.RunCmd(["fping", "-q", new_ip])
1054
      if not result.failed:
1055
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056
                                   " reachable on the network. Aborting." %
1057
                                   new_ip)
1058

    
1059
    self.op.name = new_name
1060

    
1061
  def Exec(self, feedback_fn):
1062
    """Rename the cluster.
1063

1064
    """
1065
    clustername = self.op.name
1066
    ip = self.ip
1067
    ss = self.sstore
1068

    
1069
    # shutdown the master IP
1070
    master = ss.GetMasterNode()
1071
    if not rpc.call_node_stop_master(master):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      ss.SetKey(ss.SS_MASTER_IP, ip)
1077
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078

    
1079
      # Distribute updated ss config to all nodes
1080
      myself = self.cfg.GetNodeInfo(master)
1081
      dist_nodes = self.cfg.GetNodeList()
1082
      if myself.name in dist_nodes:
1083
        dist_nodes.remove(myself.name)
1084

    
1085
      logger.Debug("Copying updated ssconf data to all nodes")
1086
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087
        fname = ss.KeyToFilename(keyname)
1088
        result = rpc.call_upload_file(dist_nodes, fname)
1089
        for to_node in dist_nodes:
1090
          if not result[to_node]:
1091
            logger.Error("copy of file %s to node %s failed" %
1092
                         (fname, to_node))
1093
    finally:
1094
      if not rpc.call_node_start_master(master):
1095
        logger.Error("Could not re-enable the master role on the master,"
1096
                     " please restart manually.")
1097

    
1098

    
1099
def _RecursiveCheckIfLVMBased(disk):
1100
  """Check if the given disk or its children are lvm-based.
1101

1102
  Args:
1103
    disk: ganeti.objects.Disk object
1104

1105
  Returns:
1106
    boolean indicating whether a LD_LV dev_type was found or not
1107

1108
  """
1109
  if disk.children:
1110
    for chdisk in disk.children:
1111
      if _RecursiveCheckIfLVMBased(chdisk):
1112
        return True
1113
  return disk.dev_type == constants.LD_LV
1114

    
1115

    
1116
class LUSetClusterParams(LogicalUnit):
1117
  """Change the parameters of the cluster.
1118

1119
  """
1120
  HPATH = "cluster-modify"
1121
  HTYPE = constants.HTYPE_CLUSTER
1122
  _OP_REQP = []
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    """
1128
    env = {
1129
      "OP_TARGET": self.sstore.GetClusterName(),
1130
      "NEW_VG_NAME": self.op.vg_name,
1131
      }
1132
    mn = self.sstore.GetMasterNode()
1133
    return env, [mn], [mn]
1134

    
1135
  def CheckPrereq(self):
1136
    """Check prerequisites.
1137

1138
    This checks whether the given params don't conflict and
1139
    if the given volume group is valid.
1140

1141
    """
1142
    if not self.op.vg_name:
1143
      instances = [self.cfg.GetInstanceInfo(name)
1144
                   for name in self.cfg.GetInstanceList()]
1145
      for inst in instances:
1146
        for disk in inst.disks:
1147
          if _RecursiveCheckIfLVMBased(disk):
1148
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1149
                                       " lvm-based instances exist")
1150

    
1151
    # if vg_name not None, checks given volume group on all nodes
1152
    if self.op.vg_name:
1153
      node_list = self.cfg.GetNodeList()
1154
      vglist = rpc.call_vg_list(node_list)
1155
      for node in node_list:
1156
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1157
        if vgstatus:
1158
          raise errors.OpPrereqError("Error on node '%s': %s" %
1159
                                     (node, vgstatus))
1160

    
1161
  def Exec(self, feedback_fn):
1162
    """Change the parameters of the cluster.
1163

1164
    """
1165
    if self.op.vg_name != self.cfg.GetVGName():
1166
      self.cfg.SetVGName(self.op.vg_name)
1167
    else:
1168
      feedback_fn("Cluster LVM configuration already in desired"
1169
                  " state, not changing")
1170

    
1171

    
1172
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1173
  """Sleep and poll for an instance's disk to sync.
1174

1175
  """
1176
  if not instance.disks:
1177
    return True
1178

    
1179
  if not oneshot:
1180
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181

    
1182
  node = instance.primary_node
1183

    
1184
  for dev in instance.disks:
1185
    cfgw.SetDiskID(dev, node)
1186

    
1187
  retries = 0
1188
  while True:
1189
    max_time = 0
1190
    done = True
1191
    cumul_degraded = False
1192
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193
    if not rstats:
1194
      proc.LogWarning("Can't get any data from node %s" % node)
1195
      retries += 1
1196
      if retries >= 10:
1197
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1198
                                 " aborting." % node)
1199
      time.sleep(6)
1200
      continue
1201
    retries = 0
1202
    for i in range(len(rstats)):
1203
      mstat = rstats[i]
1204
      if mstat is None:
1205
        proc.LogWarning("Can't compute data for node %s/%s" %
1206
                        (node, instance.disks[i].iv_name))
1207
        continue
1208
      # we ignore the ldisk parameter
1209
      perc_done, est_time, is_degraded, _ = mstat
1210
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211
      if perc_done is not None:
1212
        done = False
1213
        if est_time is not None:
1214
          rem_time = "%d estimated seconds remaining" % est_time
1215
          max_time = est_time
1216
        else:
1217
          rem_time = "no time estimate"
1218
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219
                     (instance.disks[i].iv_name, perc_done, rem_time))
1220
    if done or oneshot:
1221
      break
1222

    
1223
    if unlock:
1224
      utils.Unlock('cmd')
1225
    try:
1226
      time.sleep(min(60, max_time))
1227
    finally:
1228
      if unlock:
1229
        utils.Lock('cmd')
1230

    
1231
  if done:
1232
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1233
  return not cumul_degraded
1234

    
1235

    
1236
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1237
  """Check that mirrors are not degraded.
1238

1239
  The ldisk parameter, if True, will change the test from the
1240
  is_degraded attribute (which represents overall non-ok status for
1241
  the device(s)) to the ldisk (representing the local storage status).
1242

1243
  """
1244
  cfgw.SetDiskID(dev, node)
1245
  if ldisk:
1246
    idx = 6
1247
  else:
1248
    idx = 5
1249

    
1250
  result = True
1251
  if on_primary or dev.AssembleOnSecondary():
1252
    rstats = rpc.call_blockdev_find(node, dev)
1253
    if not rstats:
1254
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1255
      result = False
1256
    else:
1257
      result = result and (not rstats[idx])
1258
  if dev.children:
1259
    for child in dev.children:
1260
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1261

    
1262
  return result
1263

    
1264

    
1265
class LUDiagnoseOS(NoHooksLU):
1266
  """Logical unit for OS diagnose/query.
1267

1268
  """
1269
  _OP_REQP = ["output_fields", "names"]
1270

    
1271
  def CheckPrereq(self):
1272
    """Check prerequisites.
1273

1274
    This always succeeds, since this is a pure query LU.
1275

1276
    """
1277
    if self.op.names:
1278
      raise errors.OpPrereqError("Selective OS query not supported")
1279

    
1280
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1281
    _CheckOutputFields(static=[],
1282
                       dynamic=self.dynamic_fields,
1283
                       selected=self.op.output_fields)
1284

    
1285
  @staticmethod
1286
  def _DiagnoseByOS(node_list, rlist):
1287
    """Remaps a per-node return list into an a per-os per-node dictionary
1288

1289
      Args:
1290
        node_list: a list with the names of all nodes
1291
        rlist: a map with node names as keys and OS objects as values
1292

1293
      Returns:
1294
        map: a map with osnames as keys and as value another map, with
1295
             nodes as
1296
             keys and list of OS objects as values
1297
             e.g. {"debian-etch": {"node1": [<object>,...],
1298
                                   "node2": [<object>,]}
1299
                  }
1300

1301
    """
1302
    all_os = {}
1303
    for node_name, nr in rlist.iteritems():
1304
      if not nr:
1305
        continue
1306
      for os in nr:
1307
        if os.name not in all_os:
1308
          # build a list of nodes for this os containing empty lists
1309
          # for each node in node_list
1310
          all_os[os.name] = {}
1311
          for nname in node_list:
1312
            all_os[os.name][nname] = []
1313
        all_os[os.name][node_name].append(os)
1314
    return all_os
1315

    
1316
  def Exec(self, feedback_fn):
1317
    """Compute the list of OSes.
1318

1319
    """
1320
    node_list = self.cfg.GetNodeList()
1321
    node_data = rpc.call_os_diagnose(node_list)
1322
    if node_data == False:
1323
      raise errors.OpExecError("Can't gather the list of OSes")
1324
    pol = self._DiagnoseByOS(node_list, node_data)
1325
    output = []
1326
    for os_name, os_data in pol.iteritems():
1327
      row = []
1328
      for field in self.op.output_fields:
1329
        if field == "name":
1330
          val = os_name
1331
        elif field == "valid":
1332
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1333
        elif field == "node_status":
1334
          val = {}
1335
          for node_name, nos_list in os_data.iteritems():
1336
            val[node_name] = [(v.status, v.path) for v in nos_list]
1337
        else:
1338
          raise errors.ParameterError(field)
1339
        row.append(val)
1340
      output.append(row)
1341

    
1342
    return output
1343

    
1344

    
1345
class LURemoveNode(LogicalUnit):
1346
  """Logical unit for removing a node.
1347

1348
  """
1349
  HPATH = "node-remove"
1350
  HTYPE = constants.HTYPE_NODE
1351
  _OP_REQP = ["node_name"]
1352

    
1353
  def BuildHooksEnv(self):
1354
    """Build hooks env.
1355

1356
    This doesn't run on the target node in the pre phase as a failed
1357
    node would not allows itself to run.
1358

1359
    """
1360
    env = {
1361
      "OP_TARGET": self.op.node_name,
1362
      "NODE_NAME": self.op.node_name,
1363
      }
1364
    all_nodes = self.cfg.GetNodeList()
1365
    all_nodes.remove(self.op.node_name)
1366
    return env, all_nodes, all_nodes
1367

    
1368
  def CheckPrereq(self):
1369
    """Check prerequisites.
1370

1371
    This checks:
1372
     - the node exists in the configuration
1373
     - it does not have primary or secondary instances
1374
     - it's not the master
1375

1376
    Any errors are signalled by raising errors.OpPrereqError.
1377

1378
    """
1379
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1380
    if node is None:
1381
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1382

    
1383
    instance_list = self.cfg.GetInstanceList()
1384

    
1385
    masternode = self.sstore.GetMasterNode()
1386
    if node.name == masternode:
1387
      raise errors.OpPrereqError("Node is the master node,"
1388
                                 " you need to failover first.")
1389

    
1390
    for instance_name in instance_list:
1391
      instance = self.cfg.GetInstanceInfo(instance_name)
1392
      if node.name == instance.primary_node:
1393
        raise errors.OpPrereqError("Instance %s still running on the node,"
1394
                                   " please remove first." % instance_name)
1395
      if node.name in instance.secondary_nodes:
1396
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1397
                                   " please remove first." % instance_name)
1398
    self.op.node_name = node.name
1399
    self.node = node
1400

    
1401
  def Exec(self, feedback_fn):
1402
    """Removes the node from the cluster.
1403

1404
    """
1405
    node = self.node
1406
    logger.Info("stopping the node daemon and removing configs from node %s" %
1407
                node.name)
1408

    
1409
    rpc.call_node_leave_cluster(node.name)
1410

    
1411
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1412

    
1413
    logger.Info("Removing node %s from config" % node.name)
1414

    
1415
    self.cfg.RemoveNode(node.name)
1416

    
1417
    _RemoveHostFromEtcHosts(node.name)
1418

    
1419

    
1420
class LUQueryNodes(NoHooksLU):
1421
  """Logical unit for querying nodes.
1422

1423
  """
1424
  _OP_REQP = ["output_fields", "names"]
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks that the fields required are valid output fields.
1430

1431
    """
1432
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1433
                                     "mtotal", "mnode", "mfree",
1434
                                     "bootid"])
1435

    
1436
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1437
                               "pinst_list", "sinst_list",
1438
                               "pip", "sip"],
1439
                       dynamic=self.dynamic_fields,
1440
                       selected=self.op.output_fields)
1441

    
1442
    self.wanted = _GetWantedNodes(self, self.op.names)
1443

    
1444
  def Exec(self, feedback_fn):
1445
    """Computes the list of nodes and their attributes.
1446

1447
    """
1448
    nodenames = self.wanted
1449
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1450

    
1451
    # begin data gathering
1452

    
1453
    if self.dynamic_fields.intersection(self.op.output_fields):
1454
      live_data = {}
1455
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1456
      for name in nodenames:
1457
        nodeinfo = node_data.get(name, None)
1458
        if nodeinfo:
1459
          live_data[name] = {
1460
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1461
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1462
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1463
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1464
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1465
            "bootid": nodeinfo['bootid'],
1466
            }
1467
        else:
1468
          live_data[name] = {}
1469
    else:
1470
      live_data = dict.fromkeys(nodenames, {})
1471

    
1472
    node_to_primary = dict([(name, set()) for name in nodenames])
1473
    node_to_secondary = dict([(name, set()) for name in nodenames])
1474

    
1475
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1476
                             "sinst_cnt", "sinst_list"))
1477
    if inst_fields & frozenset(self.op.output_fields):
1478
      instancelist = self.cfg.GetInstanceList()
1479

    
1480
      for instance_name in instancelist:
1481
        inst = self.cfg.GetInstanceInfo(instance_name)
1482
        if inst.primary_node in node_to_primary:
1483
          node_to_primary[inst.primary_node].add(inst.name)
1484
        for secnode in inst.secondary_nodes:
1485
          if secnode in node_to_secondary:
1486
            node_to_secondary[secnode].add(inst.name)
1487

    
1488
    # end data gathering
1489

    
1490
    output = []
1491
    for node in nodelist:
1492
      node_output = []
1493
      for field in self.op.output_fields:
1494
        if field == "name":
1495
          val = node.name
1496
        elif field == "pinst_list":
1497
          val = list(node_to_primary[node.name])
1498
        elif field == "sinst_list":
1499
          val = list(node_to_secondary[node.name])
1500
        elif field == "pinst_cnt":
1501
          val = len(node_to_primary[node.name])
1502
        elif field == "sinst_cnt":
1503
          val = len(node_to_secondary[node.name])
1504
        elif field == "pip":
1505
          val = node.primary_ip
1506
        elif field == "sip":
1507
          val = node.secondary_ip
1508
        elif field in self.dynamic_fields:
1509
          val = live_data[node.name].get(field, None)
1510
        else:
1511
          raise errors.ParameterError(field)
1512
        node_output.append(val)
1513
      output.append(node_output)
1514

    
1515
    return output
1516

    
1517

    
1518
class LUQueryNodeVolumes(NoHooksLU):
1519
  """Logical unit for getting volumes on node(s).
1520

1521
  """
1522
  _OP_REQP = ["nodes", "output_fields"]
1523

    
1524
  def CheckPrereq(self):
1525
    """Check prerequisites.
1526

1527
    This checks that the fields required are valid output fields.
1528

1529
    """
1530
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1531

    
1532
    _CheckOutputFields(static=["node"],
1533
                       dynamic=["phys", "vg", "name", "size", "instance"],
1534
                       selected=self.op.output_fields)
1535

    
1536

    
1537
  def Exec(self, feedback_fn):
1538
    """Computes the list of nodes and their attributes.
1539

1540
    """
1541
    nodenames = self.nodes
1542
    volumes = rpc.call_node_volumes(nodenames)
1543

    
1544
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1545
             in self.cfg.GetInstanceList()]
1546

    
1547
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1548

    
1549
    output = []
1550
    for node in nodenames:
1551
      if node not in volumes or not volumes[node]:
1552
        continue
1553

    
1554
      node_vols = volumes[node][:]
1555
      node_vols.sort(key=lambda vol: vol['dev'])
1556

    
1557
      for vol in node_vols:
1558
        node_output = []
1559
        for field in self.op.output_fields:
1560
          if field == "node":
1561
            val = node
1562
          elif field == "phys":
1563
            val = vol['dev']
1564
          elif field == "vg":
1565
            val = vol['vg']
1566
          elif field == "name":
1567
            val = vol['name']
1568
          elif field == "size":
1569
            val = int(float(vol['size']))
1570
          elif field == "instance":
1571
            for inst in ilist:
1572
              if node not in lv_by_node[inst]:
1573
                continue
1574
              if vol['name'] in lv_by_node[inst][node]:
1575
                val = inst.name
1576
                break
1577
            else:
1578
              val = '-'
1579
          else:
1580
            raise errors.ParameterError(field)
1581
          node_output.append(str(val))
1582

    
1583
        output.append(node_output)
1584

    
1585
    return output
1586

    
1587

    
1588
class LUAddNode(LogicalUnit):
1589
  """Logical unit for adding node to the cluster.
1590

1591
  """
1592
  HPATH = "node-add"
1593
  HTYPE = constants.HTYPE_NODE
1594
  _OP_REQP = ["node_name"]
1595

    
1596
  def BuildHooksEnv(self):
1597
    """Build hooks env.
1598

1599
    This will run on all nodes before, and on all nodes + the new node after.
1600

1601
    """
1602
    env = {
1603
      "OP_TARGET": self.op.node_name,
1604
      "NODE_NAME": self.op.node_name,
1605
      "NODE_PIP": self.op.primary_ip,
1606
      "NODE_SIP": self.op.secondary_ip,
1607
      }
1608
    nodes_0 = self.cfg.GetNodeList()
1609
    nodes_1 = nodes_0 + [self.op.node_name, ]
1610
    return env, nodes_0, nodes_1
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    This checks:
1616
     - the new node is not already in the config
1617
     - it is resolvable
1618
     - its parameters (single/dual homed) matches the cluster
1619

1620
    Any errors are signalled by raising errors.OpPrereqError.
1621

1622
    """
1623
    node_name = self.op.node_name
1624
    cfg = self.cfg
1625

    
1626
    dns_data = utils.HostInfo(node_name)
1627

    
1628
    node = dns_data.name
1629
    primary_ip = self.op.primary_ip = dns_data.ip
1630
    secondary_ip = getattr(self.op, "secondary_ip", None)
1631
    if secondary_ip is None:
1632
      secondary_ip = primary_ip
1633
    if not utils.IsValidIP(secondary_ip):
1634
      raise errors.OpPrereqError("Invalid secondary IP given")
1635
    self.op.secondary_ip = secondary_ip
1636

    
1637
    node_list = cfg.GetNodeList()
1638
    if not self.op.readd and node in node_list:
1639
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1640
                                 node)
1641
    elif self.op.readd and node not in node_list:
1642
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1643

    
1644
    for existing_node_name in node_list:
1645
      existing_node = cfg.GetNodeInfo(existing_node_name)
1646

    
1647
      if self.op.readd and node == existing_node_name:
1648
        if (existing_node.primary_ip != primary_ip or
1649
            existing_node.secondary_ip != secondary_ip):
1650
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1651
                                     " address configuration as before")
1652
        continue
1653

    
1654
      if (existing_node.primary_ip == primary_ip or
1655
          existing_node.secondary_ip == primary_ip or
1656
          existing_node.primary_ip == secondary_ip or
1657
          existing_node.secondary_ip == secondary_ip):
1658
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1659
                                   " existing node %s" % existing_node.name)
1660

    
1661
    # check that the type of the node (single versus dual homed) is the
1662
    # same as for the master
1663
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1664
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1665
    newbie_singlehomed = secondary_ip == primary_ip
1666
    if master_singlehomed != newbie_singlehomed:
1667
      if master_singlehomed:
1668
        raise errors.OpPrereqError("The master has no private ip but the"
1669
                                   " new node has one")
1670
      else:
1671
        raise errors.OpPrereqError("The master has a private ip but the"
1672
                                   " new node doesn't have one")
1673

    
1674
    # checks reachablity
1675
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1676
      raise errors.OpPrereqError("Node not reachable by ping")
1677

    
1678
    if not newbie_singlehomed:
1679
      # check reachability from my secondary ip to newbie's secondary ip
1680
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1681
                           source=myself.secondary_ip):
1682
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1683
                                   " based ping to noded port")
1684

    
1685
    self.new_node = objects.Node(name=node,
1686
                                 primary_ip=primary_ip,
1687
                                 secondary_ip=secondary_ip)
1688

    
1689
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1690
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1691
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1692
                                   constants.VNC_PASSWORD_FILE)
1693

    
1694
  def Exec(self, feedback_fn):
1695
    """Adds the new node to the cluster.
1696

1697
    """
1698
    new_node = self.new_node
1699
    node = new_node.name
1700

    
1701
    # set up inter-node password and certificate and restarts the node daemon
1702
    gntpass = self.sstore.GetNodeDaemonPassword()
1703
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1704
      raise errors.OpExecError("ganeti password corruption detected")
1705
    f = open(constants.SSL_CERT_FILE)
1706
    try:
1707
      gntpem = f.read(8192)
1708
    finally:
1709
      f.close()
1710
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1711
    # so we use this to detect an invalid certificate; as long as the
1712
    # cert doesn't contain this, the here-document will be correctly
1713
    # parsed by the shell sequence below
1714
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1715
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1716
    if not gntpem.endswith("\n"):
1717
      raise errors.OpExecError("PEM must end with newline")
1718
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1719

    
1720
    # and then connect with ssh to set password and start ganeti-noded
1721
    # note that all the below variables are sanitized at this point,
1722
    # either by being constants or by the checks above
1723
    ss = self.sstore
1724
    mycommand = ("umask 077 && "
1725
                 "echo '%s' > '%s' && "
1726
                 "cat > '%s' << '!EOF.' && \n"
1727
                 "%s!EOF.\n%s restart" %
1728
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1729
                  constants.SSL_CERT_FILE, gntpem,
1730
                  constants.NODE_INITD_SCRIPT))
1731

    
1732
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1733
    if result.failed:
1734
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1735
                               " output: %s" %
1736
                               (node, result.fail_reason, result.output))
1737

    
1738
    # check connectivity
1739
    time.sleep(4)
1740

    
1741
    result = rpc.call_version([node])[node]
1742
    if result:
1743
      if constants.PROTOCOL_VERSION == result:
1744
        logger.Info("communication to node %s fine, sw version %s match" %
1745
                    (node, result))
1746
      else:
1747
        raise errors.OpExecError("Version mismatch master version %s,"
1748
                                 " node version %s" %
1749
                                 (constants.PROTOCOL_VERSION, result))
1750
    else:
1751
      raise errors.OpExecError("Cannot get version from the new node")
1752

    
1753
    # setup ssh on node
1754
    logger.Info("copy ssh key to node %s" % node)
1755
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1756
    keyarray = []
1757
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1758
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1759
                priv_key, pub_key]
1760

    
1761
    for i in keyfiles:
1762
      f = open(i, 'r')
1763
      try:
1764
        keyarray.append(f.read())
1765
      finally:
1766
        f.close()
1767

    
1768
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1769
                               keyarray[3], keyarray[4], keyarray[5])
1770

    
1771
    if not result:
1772
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1773

    
1774
    # Add node to our /etc/hosts, and add key to known_hosts
1775
    _AddHostToEtcHosts(new_node.name)
1776

    
1777
    if new_node.secondary_ip != new_node.primary_ip:
1778
      if not rpc.call_node_tcp_ping(new_node.name,
1779
                                    constants.LOCALHOST_IP_ADDRESS,
1780
                                    new_node.secondary_ip,
1781
                                    constants.DEFAULT_NODED_PORT,
1782
                                    10, False):
1783
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1784
                                 " you gave (%s). Please fix and re-run this"
1785
                                 " command." % new_node.secondary_ip)
1786

    
1787
    success, msg = self.ssh.VerifyNodeHostname(node)
1788
    if not success:
1789
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1790
                               " than the one the resolver gives: %s."
1791
                               " Please fix and re-run this command." %
1792
                               (node, msg))
1793

    
1794
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1795
    # including the node just added
1796
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1797
    dist_nodes = self.cfg.GetNodeList() + [node]
1798
    if myself.name in dist_nodes:
1799
      dist_nodes.remove(myself.name)
1800

    
1801
    logger.Debug("Copying hosts and known_hosts to all nodes")
1802
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1803
      result = rpc.call_upload_file(dist_nodes, fname)
1804
      for to_node in dist_nodes:
1805
        if not result[to_node]:
1806
          logger.Error("copy of file %s to node %s failed" %
1807
                       (fname, to_node))
1808

    
1809
    to_copy = ss.GetFileList()
1810
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1811
      to_copy.append(constants.VNC_PASSWORD_FILE)
1812
    for fname in to_copy:
1813
      if not self.ssh.CopyFileToNode(node, fname):
1814
        logger.Error("could not copy file %s to node %s" % (fname, node))
1815

    
1816
    if not self.op.readd:
1817
      logger.Info("adding node %s to cluster.conf" % node)
1818
      self.cfg.AddNode(new_node)
1819

    
1820

    
1821
class LUMasterFailover(LogicalUnit):
1822
  """Failover the master node to the current node.
1823

1824
  This is a special LU in that it must run on a non-master node.
1825

1826
  """
1827
  HPATH = "master-failover"
1828
  HTYPE = constants.HTYPE_CLUSTER
1829
  REQ_MASTER = False
1830
  _OP_REQP = []
1831

    
1832
  def BuildHooksEnv(self):
1833
    """Build hooks env.
1834

1835
    This will run on the new master only in the pre phase, and on all
1836
    the nodes in the post phase.
1837

1838
    """
1839
    env = {
1840
      "OP_TARGET": self.new_master,
1841
      "NEW_MASTER": self.new_master,
1842
      "OLD_MASTER": self.old_master,
1843
      }
1844
    return env, [self.new_master], self.cfg.GetNodeList()
1845

    
1846
  def CheckPrereq(self):
1847
    """Check prerequisites.
1848

1849
    This checks that we are not already the master.
1850

1851
    """
1852
    self.new_master = utils.HostInfo().name
1853
    self.old_master = self.sstore.GetMasterNode()
1854

    
1855
    if self.old_master == self.new_master:
1856
      raise errors.OpPrereqError("This commands must be run on the node"
1857
                                 " where you want the new master to be."
1858
                                 " %s is already the master" %
1859
                                 self.old_master)
1860

    
1861
  def Exec(self, feedback_fn):
1862
    """Failover the master node.
1863

1864
    This command, when run on a non-master node, will cause the current
1865
    master to cease being master, and the non-master to become new
1866
    master.
1867

1868
    """
1869
    #TODO: do not rely on gethostname returning the FQDN
1870
    logger.Info("setting master to %s, old master: %s" %
1871
                (self.new_master, self.old_master))
1872

    
1873
    if not rpc.call_node_stop_master(self.old_master):
1874
      logger.Error("could disable the master role on the old master"
1875
                   " %s, please disable manually" % self.old_master)
1876

    
1877
    ss = self.sstore
1878
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1879
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1880
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1881
      logger.Error("could not distribute the new simple store master file"
1882
                   " to the other nodes, please check.")
1883

    
1884
    if not rpc.call_node_start_master(self.new_master):
1885
      logger.Error("could not start the master role on the new master"
1886
                   " %s, please check" % self.new_master)
1887
      feedback_fn("Error in activating the master IP on the new master,"
1888
                  " please fix manually.")
1889

    
1890

    
1891

    
1892
class LUQueryClusterInfo(NoHooksLU):
1893
  """Query cluster configuration.
1894

1895
  """
1896
  _OP_REQP = []
1897
  REQ_MASTER = False
1898

    
1899
  def CheckPrereq(self):
1900
    """No prerequsites needed for this LU.
1901

1902
    """
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Return cluster config.
1907

1908
    """
1909
    result = {
1910
      "name": self.sstore.GetClusterName(),
1911
      "software_version": constants.RELEASE_VERSION,
1912
      "protocol_version": constants.PROTOCOL_VERSION,
1913
      "config_version": constants.CONFIG_VERSION,
1914
      "os_api_version": constants.OS_API_VERSION,
1915
      "export_version": constants.EXPORT_VERSION,
1916
      "master": self.sstore.GetMasterNode(),
1917
      "architecture": (platform.architecture()[0], platform.machine()),
1918
      }
1919

    
1920
    return result
1921

    
1922

    
1923
class LUClusterCopyFile(NoHooksLU):
1924
  """Copy file to cluster.
1925

1926
  """
1927
  _OP_REQP = ["nodes", "filename"]
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    It should check that the named file exists and that the given list
1933
    of nodes is valid.
1934

1935
    """
1936
    if not os.path.exists(self.op.filename):
1937
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1938

    
1939
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1940

    
1941
  def Exec(self, feedback_fn):
1942
    """Copy a file from master to some nodes.
1943

1944
    Args:
1945
      opts - class with options as members
1946
      args - list containing a single element, the file name
1947
    Opts used:
1948
      nodes - list containing the name of target nodes; if empty, all nodes
1949

1950
    """
1951
    filename = self.op.filename
1952

    
1953
    myname = utils.HostInfo().name
1954

    
1955
    for node in self.nodes:
1956
      if node == myname:
1957
        continue
1958
      if not self.ssh.CopyFileToNode(node, filename):
1959
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1960

    
1961

    
1962
class LUDumpClusterConfig(NoHooksLU):
1963
  """Return a text-representation of the cluster-config.
1964

1965
  """
1966
  _OP_REQP = []
1967

    
1968
  def CheckPrereq(self):
1969
    """No prerequisites.
1970

1971
    """
1972
    pass
1973

    
1974
  def Exec(self, feedback_fn):
1975
    """Dump a representation of the cluster config to the standard output.
1976

1977
    """
1978
    return self.cfg.DumpConfig()
1979

    
1980

    
1981
class LURunClusterCommand(NoHooksLU):
1982
  """Run a command on some nodes.
1983

1984
  """
1985
  _OP_REQP = ["command", "nodes"]
1986

    
1987
  def CheckPrereq(self):
1988
    """Check prerequisites.
1989

1990
    It checks that the given list of nodes is valid.
1991

1992
    """
1993
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Run a command on some nodes.
1997

1998
    """
1999
    # put the master at the end of the nodes list
2000
    master_node = self.sstore.GetMasterNode()
2001
    if master_node in self.nodes:
2002
      self.nodes.remove(master_node)
2003
      self.nodes.append(master_node)
2004

    
2005
    data = []
2006
    for node in self.nodes:
2007
      result = self.ssh.Run(node, "root", self.op.command)
2008
      data.append((node, result.output, result.exit_code))
2009

    
2010
    return data
2011

    
2012

    
2013
class LUActivateInstanceDisks(NoHooksLU):
2014
  """Bring up an instance's disks.
2015

2016
  """
2017
  _OP_REQP = ["instance_name"]
2018

    
2019
  def CheckPrereq(self):
2020
    """Check prerequisites.
2021

2022
    This checks that the instance is in the cluster.
2023

2024
    """
2025
    instance = self.cfg.GetInstanceInfo(
2026
      self.cfg.ExpandInstanceName(self.op.instance_name))
2027
    if instance is None:
2028
      raise errors.OpPrereqError("Instance '%s' not known" %
2029
                                 self.op.instance_name)
2030
    self.instance = instance
2031

    
2032

    
2033
  def Exec(self, feedback_fn):
2034
    """Activate the disks.
2035

2036
    """
2037
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2038
    if not disks_ok:
2039
      raise errors.OpExecError("Cannot activate block devices")
2040

    
2041
    return disks_info
2042

    
2043

    
2044
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2045
  """Prepare the block devices for an instance.
2046

2047
  This sets up the block devices on all nodes.
2048

2049
  Args:
2050
    instance: a ganeti.objects.Instance object
2051
    ignore_secondaries: if true, errors on secondary nodes won't result
2052
                        in an error return from the function
2053

2054
  Returns:
2055
    false if the operation failed
2056
    list of (host, instance_visible_name, node_visible_name) if the operation
2057
         suceeded with the mapping from node devices to instance devices
2058
  """
2059
  device_info = []
2060
  disks_ok = True
2061
  iname = instance.name
2062
  # With the two passes mechanism we try to reduce the window of
2063
  # opportunity for the race condition of switching DRBD to primary
2064
  # before handshaking occured, but we do not eliminate it
2065

    
2066
  # The proper fix would be to wait (with some limits) until the
2067
  # connection has been made and drbd transitions from WFConnection
2068
  # into any other network-connected state (Connected, SyncTarget,
2069
  # SyncSource, etc.)
2070

    
2071
  # 1st pass, assemble on all nodes in secondary mode
2072
  for inst_disk in instance.disks:
2073
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2074
      cfg.SetDiskID(node_disk, node)
2075
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2076
      if not result:
2077
        logger.Error("could not prepare block device %s on node %s"
2078
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2079
        if not ignore_secondaries:
2080
          disks_ok = False
2081

    
2082
  # FIXME: race condition on drbd migration to primary
2083

    
2084
  # 2nd pass, do only the primary node
2085
  for inst_disk in instance.disks:
2086
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2087
      if node != instance.primary_node:
2088
        continue
2089
      cfg.SetDiskID(node_disk, node)
2090
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2091
      if not result:
2092
        logger.Error("could not prepare block device %s on node %s"
2093
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2094
        disks_ok = False
2095
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2096

    
2097
  # leave the disks configured for the primary node
2098
  # this is a workaround that would be fixed better by
2099
  # improving the logical/physical id handling
2100
  for disk in instance.disks:
2101
    cfg.SetDiskID(disk, instance.primary_node)
2102

    
2103
  return disks_ok, device_info
2104

    
2105

    
2106
def _StartInstanceDisks(cfg, instance, force):
2107
  """Start the disks of an instance.
2108

2109
  """
2110
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2111
                                           ignore_secondaries=force)
2112
  if not disks_ok:
2113
    _ShutdownInstanceDisks(instance, cfg)
2114
    if force is not None and not force:
2115
      logger.Error("If the message above refers to a secondary node,"
2116
                   " you can retry the operation using '--force'.")
2117
    raise errors.OpExecError("Disk consistency error")
2118

    
2119

    
2120
class LUDeactivateInstanceDisks(NoHooksLU):
2121
  """Shutdown an instance's disks.
2122

2123
  """
2124
  _OP_REQP = ["instance_name"]
2125

    
2126
  def CheckPrereq(self):
2127
    """Check prerequisites.
2128

2129
    This checks that the instance is in the cluster.
2130

2131
    """
2132
    instance = self.cfg.GetInstanceInfo(
2133
      self.cfg.ExpandInstanceName(self.op.instance_name))
2134
    if instance is None:
2135
      raise errors.OpPrereqError("Instance '%s' not known" %
2136
                                 self.op.instance_name)
2137
    self.instance = instance
2138

    
2139
  def Exec(self, feedback_fn):
2140
    """Deactivate the disks
2141

2142
    """
2143
    instance = self.instance
2144
    ins_l = rpc.call_instance_list([instance.primary_node])
2145
    ins_l = ins_l[instance.primary_node]
2146
    if not type(ins_l) is list:
2147
      raise errors.OpExecError("Can't contact node '%s'" %
2148
                               instance.primary_node)
2149

    
2150
    if self.instance.name in ins_l:
2151
      raise errors.OpExecError("Instance is running, can't shutdown"
2152
                               " block devices.")
2153

    
2154
    _ShutdownInstanceDisks(instance, self.cfg)
2155

    
2156

    
2157
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2158
  """Shutdown block devices of an instance.
2159

2160
  This does the shutdown on all nodes of the instance.
2161

2162
  If the ignore_primary is false, errors on the primary node are
2163
  ignored.
2164

2165
  """
2166
  result = True
2167
  for disk in instance.disks:
2168
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2169
      cfg.SetDiskID(top_disk, node)
2170
      if not rpc.call_blockdev_shutdown(node, top_disk):
2171
        logger.Error("could not shutdown block device %s on node %s" %
2172
                     (disk.iv_name, node))
2173
        if not ignore_primary or node != instance.primary_node:
2174
          result = False
2175
  return result
2176

    
2177

    
2178
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2179
  """Checks if a node has enough free memory.
2180

2181
  This function check if a given node has the needed amount of free
2182
  memory. In case the node has less memory or we cannot get the
2183
  information from the node, this function raise an OpPrereqError
2184
  exception.
2185

2186
  Args:
2187
    - cfg: a ConfigWriter instance
2188
    - node: the node name
2189
    - reason: string to use in the error message
2190
    - requested: the amount of memory in MiB
2191

2192
  """
2193
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2194
  if not nodeinfo or not isinstance(nodeinfo, dict):
2195
    raise errors.OpPrereqError("Could not contact node %s for resource"
2196
                             " information" % (node,))
2197

    
2198
  free_mem = nodeinfo[node].get('memory_free')
2199
  if not isinstance(free_mem, int):
2200
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2201
                             " was '%s'" % (node, free_mem))
2202
  if requested > free_mem:
2203
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2204
                             " needed %s MiB, available %s MiB" %
2205
                             (node, reason, requested, free_mem))
2206

    
2207

    
2208
class LUStartupInstance(LogicalUnit):
2209
  """Starts an instance.
2210

2211
  """
2212
  HPATH = "instance-start"
2213
  HTYPE = constants.HTYPE_INSTANCE
2214
  _OP_REQP = ["instance_name", "force"]
2215

    
2216
  def BuildHooksEnv(self):
2217
    """Build hooks env.
2218

2219
    This runs on master, primary and secondary nodes of the instance.
2220

2221
    """
2222
    env = {
2223
      "FORCE": self.op.force,
2224
      }
2225
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2226
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227
          list(self.instance.secondary_nodes))
2228
    return env, nl, nl
2229

    
2230
  def CheckPrereq(self):
2231
    """Check prerequisites.
2232

2233
    This checks that the instance is in the cluster.
2234

2235
    """
2236
    instance = self.cfg.GetInstanceInfo(
2237
      self.cfg.ExpandInstanceName(self.op.instance_name))
2238
    if instance is None:
2239
      raise errors.OpPrereqError("Instance '%s' not known" %
2240
                                 self.op.instance_name)
2241

    
2242
    # check bridges existance
2243
    _CheckInstanceBridgesExist(instance)
2244

    
2245
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2246
                         "starting instance %s" % instance.name,
2247
                         instance.memory)
2248

    
2249
    self.instance = instance
2250
    self.op.instance_name = instance.name
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Start the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    force = self.op.force
2258
    extra_args = getattr(self.op, "extra_args", "")
2259

    
2260
    self.cfg.MarkInstanceUp(instance.name)
2261

    
2262
    node_current = instance.primary_node
2263

    
2264
    _StartInstanceDisks(self.cfg, instance, force)
2265

    
2266
    if not rpc.call_instance_start(node_current, instance, extra_args):
2267
      _ShutdownInstanceDisks(instance, self.cfg)
2268
      raise errors.OpExecError("Could not start instance")
2269

    
2270

    
2271
class LURebootInstance(LogicalUnit):
2272
  """Reboot an instance.
2273

2274
  """
2275
  HPATH = "instance-reboot"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = {
2286
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2287
      }
2288
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304

    
2305
    # check bridges existance
2306
    _CheckInstanceBridgesExist(instance)
2307

    
2308
    self.instance = instance
2309
    self.op.instance_name = instance.name
2310

    
2311
  def Exec(self, feedback_fn):
2312
    """Reboot the instance.
2313

2314
    """
2315
    instance = self.instance
2316
    ignore_secondaries = self.op.ignore_secondaries
2317
    reboot_type = self.op.reboot_type
2318
    extra_args = getattr(self.op, "extra_args", "")
2319

    
2320
    node_current = instance.primary_node
2321

    
2322
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2323
                           constants.INSTANCE_REBOOT_HARD,
2324
                           constants.INSTANCE_REBOOT_FULL]:
2325
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2326
                                  (constants.INSTANCE_REBOOT_SOFT,
2327
                                   constants.INSTANCE_REBOOT_HARD,
2328
                                   constants.INSTANCE_REBOOT_FULL))
2329

    
2330
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2331
                       constants.INSTANCE_REBOOT_HARD]:
2332
      if not rpc.call_instance_reboot(node_current, instance,
2333
                                      reboot_type, extra_args):
2334
        raise errors.OpExecError("Could not reboot instance")
2335
    else:
2336
      if not rpc.call_instance_shutdown(node_current, instance):
2337
        raise errors.OpExecError("could not shutdown instance for full reboot")
2338
      _ShutdownInstanceDisks(instance, self.cfg)
2339
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2340
      if not rpc.call_instance_start(node_current, instance, extra_args):
2341
        _ShutdownInstanceDisks(instance, self.cfg)
2342
        raise errors.OpExecError("Could not start instance for full reboot")
2343

    
2344
    self.cfg.MarkInstanceUp(instance.name)
2345

    
2346

    
2347
class LUShutdownInstance(LogicalUnit):
2348
  """Shutdown an instance.
2349

2350
  """
2351
  HPATH = "instance-stop"
2352
  HTYPE = constants.HTYPE_INSTANCE
2353
  _OP_REQP = ["instance_name"]
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on master, primary and secondary nodes of the instance.
2359

2360
    """
2361
    env = _BuildInstanceHookEnvByObject(self.instance)
2362
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2363
          list(self.instance.secondary_nodes))
2364
    return env, nl, nl
2365

    
2366
  def CheckPrereq(self):
2367
    """Check prerequisites.
2368

2369
    This checks that the instance is in the cluster.
2370

2371
    """
2372
    instance = self.cfg.GetInstanceInfo(
2373
      self.cfg.ExpandInstanceName(self.op.instance_name))
2374
    if instance is None:
2375
      raise errors.OpPrereqError("Instance '%s' not known" %
2376
                                 self.op.instance_name)
2377
    self.instance = instance
2378

    
2379
  def Exec(self, feedback_fn):
2380
    """Shutdown the instance.
2381

2382
    """
2383
    instance = self.instance
2384
    node_current = instance.primary_node
2385
    self.cfg.MarkInstanceDown(instance.name)
2386
    if not rpc.call_instance_shutdown(node_current, instance):
2387
      logger.Error("could not shutdown instance")
2388

    
2389
    _ShutdownInstanceDisks(instance, self.cfg)
2390

    
2391

    
2392
class LUReinstallInstance(LogicalUnit):
2393
  """Reinstall an instance.
2394

2395
  """
2396
  HPATH = "instance-reinstall"
2397
  HTYPE = constants.HTYPE_INSTANCE
2398
  _OP_REQP = ["instance_name"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = _BuildInstanceHookEnvByObject(self.instance)
2407
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2408
          list(self.instance.secondary_nodes))
2409
    return env, nl, nl
2410

    
2411
  def CheckPrereq(self):
2412
    """Check prerequisites.
2413

2414
    This checks that the instance is in the cluster and is not running.
2415

2416
    """
2417
    instance = self.cfg.GetInstanceInfo(
2418
      self.cfg.ExpandInstanceName(self.op.instance_name))
2419
    if instance is None:
2420
      raise errors.OpPrereqError("Instance '%s' not known" %
2421
                                 self.op.instance_name)
2422
    if instance.disk_template == constants.DT_DISKLESS:
2423
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2424
                                 self.op.instance_name)
2425
    if instance.status != "down":
2426
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2427
                                 self.op.instance_name)
2428
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2429
    if remote_info:
2430
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2431
                                 (self.op.instance_name,
2432
                                  instance.primary_node))
2433

    
2434
    self.op.os_type = getattr(self.op, "os_type", None)
2435
    if self.op.os_type is not None:
2436
      # OS verification
2437
      pnode = self.cfg.GetNodeInfo(
2438
        self.cfg.ExpandNodeName(instance.primary_node))
2439
      if pnode is None:
2440
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2441
                                   self.op.pnode)
2442
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2443
      if not os_obj:
2444
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2445
                                   " primary node"  % self.op.os_type)
2446

    
2447
    self.instance = instance
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Reinstall the instance.
2451

2452
    """
2453
    inst = self.instance
2454

    
2455
    if self.op.os_type is not None:
2456
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2457
      inst.os = self.op.os_type
2458
      self.cfg.AddInstance(inst)
2459

    
2460
    _StartInstanceDisks(self.cfg, inst, None)
2461
    try:
2462
      feedback_fn("Running the instance OS create scripts...")
2463
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2464
        raise errors.OpExecError("Could not install OS for instance %s"
2465
                                 " on node %s" %
2466
                                 (inst.name, inst.primary_node))
2467
    finally:
2468
      _ShutdownInstanceDisks(inst, self.cfg)
2469

    
2470

    
2471
class LURenameInstance(LogicalUnit):
2472
  """Rename an instance.
2473

2474
  """
2475
  HPATH = "instance-rename"
2476
  HTYPE = constants.HTYPE_INSTANCE
2477
  _OP_REQP = ["instance_name", "new_name"]
2478

    
2479
  def BuildHooksEnv(self):
2480
    """Build hooks env.
2481

2482
    This runs on master, primary and secondary nodes of the instance.
2483

2484
    """
2485
    env = _BuildInstanceHookEnvByObject(self.instance)
2486
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2487
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2488
          list(self.instance.secondary_nodes))
2489
    return env, nl, nl
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster and is not running.
2495

2496
    """
2497
    instance = self.cfg.GetInstanceInfo(
2498
      self.cfg.ExpandInstanceName(self.op.instance_name))
2499
    if instance is None:
2500
      raise errors.OpPrereqError("Instance '%s' not known" %
2501
                                 self.op.instance_name)
2502
    if instance.status != "down":
2503
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2504
                                 self.op.instance_name)
2505
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2506
    if remote_info:
2507
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2508
                                 (self.op.instance_name,
2509
                                  instance.primary_node))
2510
    self.instance = instance
2511

    
2512
    # new name verification
2513
    name_info = utils.HostInfo(self.op.new_name)
2514

    
2515
    self.op.new_name = new_name = name_info.name
2516
    instance_list = self.cfg.GetInstanceList()
2517
    if new_name in instance_list:
2518
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2519
                                 new_name)
2520

    
2521
    if not getattr(self.op, "ignore_ip", False):
2522
      command = ["fping", "-q", name_info.ip]
2523
      result = utils.RunCmd(command)
2524
      if not result.failed:
2525
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2526
                                   (name_info.ip, new_name))
2527

    
2528

    
2529
  def Exec(self, feedback_fn):
2530
    """Reinstall the instance.
2531

2532
    """
2533
    inst = self.instance
2534
    old_name = inst.name
2535

    
2536
    if inst.disk_template == constants.DT_FILE:
2537
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2538

    
2539
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2540

    
2541
    # re-read the instance from the configuration after rename
2542
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2543

    
2544
    if inst.disk_template == constants.DT_FILE:
2545
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2546
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2547
                                                old_file_storage_dir,
2548
                                                new_file_storage_dir)
2549

    
2550
      if not result:
2551
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2552
                                 " directory '%s' to '%s' (but the instance"
2553
                                 " has been renamed in Ganeti)" % (
2554
                                 inst.primary_node, old_file_storage_dir,
2555
                                 new_file_storage_dir))
2556

    
2557
      if not result[0]:
2558
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2559
                                 " (but the instance has been renamed in"
2560
                                 " Ganeti)" % (old_file_storage_dir,
2561
                                               new_file_storage_dir))
2562

    
2563
    _StartInstanceDisks(self.cfg, inst, None)
2564
    try:
2565
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2566
                                          "sda", "sdb"):
2567
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2568
               " instance has been renamed in Ganeti)" %
2569
               (inst.name, inst.primary_node))
2570
        logger.Error(msg)
2571
    finally:
2572
      _ShutdownInstanceDisks(inst, self.cfg)
2573

    
2574

    
2575
class LURemoveInstance(LogicalUnit):
2576
  """Remove an instance.
2577

2578
  """
2579
  HPATH = "instance-remove"
2580
  HTYPE = constants.HTYPE_INSTANCE
2581
  _OP_REQP = ["instance_name"]
2582

    
2583
  def BuildHooksEnv(self):
2584
    """Build hooks env.
2585

2586
    This runs on master, primary and secondary nodes of the instance.
2587

2588
    """
2589
    env = _BuildInstanceHookEnvByObject(self.instance)
2590
    nl = [self.sstore.GetMasterNode()]
2591
    return env, nl, nl
2592

    
2593
  def CheckPrereq(self):
2594
    """Check prerequisites.
2595

2596
    This checks that the instance is in the cluster.
2597

2598
    """
2599
    instance = self.cfg.GetInstanceInfo(
2600
      self.cfg.ExpandInstanceName(self.op.instance_name))
2601
    if instance is None:
2602
      raise errors.OpPrereqError("Instance '%s' not known" %
2603
                                 self.op.instance_name)
2604
    self.instance = instance
2605

    
2606
  def Exec(self, feedback_fn):
2607
    """Remove the instance.
2608

2609
    """
2610
    instance = self.instance
2611
    logger.Info("shutting down instance %s on node %s" %
2612
                (instance.name, instance.primary_node))
2613

    
2614
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2615
      if self.op.ignore_failures:
2616
        feedback_fn("Warning: can't shutdown instance")
2617
      else:
2618
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2619
                                 (instance.name, instance.primary_node))
2620

    
2621
    logger.Info("removing block devices for instance %s" % instance.name)
2622

    
2623
    if not _RemoveDisks(instance, self.cfg):
2624
      if self.op.ignore_failures:
2625
        feedback_fn("Warning: can't remove instance's disks")
2626
      else:
2627
        raise errors.OpExecError("Can't remove instance's disks")
2628

    
2629
    logger.Info("removing instance %s out of cluster config" % instance.name)
2630

    
2631
    self.cfg.RemoveInstance(instance.name)
2632

    
2633

    
2634
class LUQueryInstances(NoHooksLU):
2635
  """Logical unit for querying instances.
2636

2637
  """
2638
  _OP_REQP = ["output_fields", "names"]
2639

    
2640
  def CheckPrereq(self):
2641
    """Check prerequisites.
2642

2643
    This checks that the fields required are valid output fields.
2644

2645
    """
2646
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2647
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2648
                               "admin_state", "admin_ram",
2649
                               "disk_template", "ip", "mac", "bridge",
2650
                               "sda_size", "sdb_size", "vcpus"],
2651
                       dynamic=self.dynamic_fields,
2652
                       selected=self.op.output_fields)
2653

    
2654
    self.wanted = _GetWantedInstances(self, self.op.names)
2655

    
2656
  def Exec(self, feedback_fn):
2657
    """Computes the list of nodes and their attributes.
2658

2659
    """
2660
    instance_names = self.wanted
2661
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2662
                     in instance_names]
2663

    
2664
    # begin data gathering
2665

    
2666
    nodes = frozenset([inst.primary_node for inst in instance_list])
2667

    
2668
    bad_nodes = []
2669
    if self.dynamic_fields.intersection(self.op.output_fields):
2670
      live_data = {}
2671
      node_data = rpc.call_all_instances_info(nodes)
2672
      for name in nodes:
2673
        result = node_data[name]
2674
        if result:
2675
          live_data.update(result)
2676
        elif result == False:
2677
          bad_nodes.append(name)
2678
        # else no instance is alive
2679
    else:
2680
      live_data = dict([(name, {}) for name in instance_names])
2681

    
2682
    # end data gathering
2683

    
2684
    output = []
2685
    for instance in instance_list:
2686
      iout = []
2687
      for field in self.op.output_fields:
2688
        if field == "name":
2689
          val = instance.name
2690
        elif field == "os":
2691
          val = instance.os
2692
        elif field == "pnode":
2693
          val = instance.primary_node
2694
        elif field == "snodes":
2695
          val = list(instance.secondary_nodes)
2696
        elif field == "admin_state":
2697
          val = (instance.status != "down")
2698
        elif field == "oper_state":
2699
          if instance.primary_node in bad_nodes:
2700
            val = None
2701
          else:
2702
            val = bool(live_data.get(instance.name))
2703
        elif field == "status":
2704
          if instance.primary_node in bad_nodes:
2705
            val = "ERROR_nodedown"
2706
          else:
2707
            running = bool(live_data.get(instance.name))
2708
            if running:
2709
              if instance.status != "down":
2710
                val = "running"
2711
              else:
2712
                val = "ERROR_up"
2713
            else:
2714
              if instance.status != "down":
2715
                val = "ERROR_down"
2716
              else:
2717
                val = "ADMIN_down"
2718
        elif field == "admin_ram":
2719
          val = instance.memory
2720
        elif field == "oper_ram":
2721
          if instance.primary_node in bad_nodes:
2722
            val = None
2723
          elif instance.name in live_data:
2724
            val = live_data[instance.name].get("memory", "?")
2725
          else:
2726
            val = "-"
2727
        elif field == "disk_template":
2728
          val = instance.disk_template
2729
        elif field == "ip":
2730
          val = instance.nics[0].ip
2731
        elif field == "bridge":
2732
          val = instance.nics[0].bridge
2733
        elif field == "mac":
2734
          val = instance.nics[0].mac
2735
        elif field == "sda_size" or field == "sdb_size":
2736
          disk = instance.FindDisk(field[:3])
2737
          if disk is None:
2738
            val = None
2739
          else:
2740
            val = disk.size
2741
        elif field == "vcpus":
2742
          val = instance.vcpus
2743
        else:
2744
          raise errors.ParameterError(field)
2745
        iout.append(val)
2746
      output.append(iout)
2747

    
2748
    return output
2749

    
2750

    
2751
class LUFailoverInstance(LogicalUnit):
2752
  """Failover an instance.
2753

2754
  """
2755
  HPATH = "instance-failover"
2756
  HTYPE = constants.HTYPE_INSTANCE
2757
  _OP_REQP = ["instance_name", "ignore_consistency"]
2758

    
2759
  def BuildHooksEnv(self):
2760
    """Build hooks env.
2761

2762
    This runs on master, primary and secondary nodes of the instance.
2763

2764
    """
2765
    env = {
2766
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2767
      }
2768
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2769
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2770
    return env, nl, nl
2771

    
2772
  def CheckPrereq(self):
2773
    """Check prerequisites.
2774

2775
    This checks that the instance is in the cluster.
2776

2777
    """
2778
    instance = self.cfg.GetInstanceInfo(
2779
      self.cfg.ExpandInstanceName(self.op.instance_name))
2780
    if instance is None:
2781
      raise errors.OpPrereqError("Instance '%s' not known" %
2782
                                 self.op.instance_name)
2783

    
2784
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2785
      raise errors.OpPrereqError("Instance's disk layout is not"
2786
                                 " network mirrored, cannot failover.")
2787

    
2788
    secondary_nodes = instance.secondary_nodes
2789
    if not secondary_nodes:
2790
      raise errors.ProgrammerError("no secondary node but using "
2791
                                   "DT_REMOTE_RAID1 template")
2792

    
2793
    target_node = secondary_nodes[0]
2794
    # check memory requirements on the secondary node
2795
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2796
                         instance.name, instance.memory)
2797

    
2798
    # check bridge existance
2799
    brlist = [nic.bridge for nic in instance.nics]
2800
    if not rpc.call_bridges_exist(target_node, brlist):
2801
      raise errors.OpPrereqError("One or more target bridges %s does not"
2802
                                 " exist on destination node '%s'" %
2803
                                 (brlist, target_node))
2804

    
2805
    self.instance = instance
2806

    
2807
  def Exec(self, feedback_fn):
2808
    """Failover an instance.
2809

2810
    The failover is done by shutting it down on its present node and
2811
    starting it on the secondary.
2812

2813
    """
2814
    instance = self.instance
2815

    
2816
    source_node = instance.primary_node
2817
    target_node = instance.secondary_nodes[0]
2818

    
2819
    feedback_fn("* checking disk consistency between source and target")
2820
    for dev in instance.disks:
2821
      # for remote_raid1, these are md over drbd
2822
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2823
        if instance.status == "up" and not self.op.ignore_consistency:
2824
          raise errors.OpExecError("Disk %s is degraded on target node,"
2825
                                   " aborting failover." % dev.iv_name)
2826

    
2827
    feedback_fn("* shutting down instance on source node")
2828
    logger.Info("Shutting down instance %s on node %s" %
2829
                (instance.name, source_node))
2830

    
2831
    if not rpc.call_instance_shutdown(source_node, instance):
2832
      if self.op.ignore_consistency:
2833
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2834
                     " anyway. Please make sure node %s is down"  %
2835
                     (instance.name, source_node, source_node))
2836
      else:
2837
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2838
                                 (instance.name, source_node))
2839

    
2840
    feedback_fn("* deactivating the instance's disks on source node")
2841
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2842
      raise errors.OpExecError("Can't shut down the instance's disks.")
2843

    
2844
    instance.primary_node = target_node
2845
    # distribute new instance config to the other nodes
2846
    self.cfg.AddInstance(instance)
2847

    
2848
    # Only start the instance if it's marked as up
2849
    if instance.status == "up":
2850
      feedback_fn("* activating the instance's disks on target node")
2851
      logger.Info("Starting instance %s on node %s" %
2852
                  (instance.name, target_node))
2853

    
2854
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2855
                                               ignore_secondaries=True)
2856
      if not disks_ok:
2857
        _ShutdownInstanceDisks(instance, self.cfg)
2858
        raise errors.OpExecError("Can't activate the instance's disks")
2859

    
2860
      feedback_fn("* starting the instance on the target node")
2861
      if not rpc.call_instance_start(target_node, instance, None):
2862
        _ShutdownInstanceDisks(instance, self.cfg)
2863
        raise errors.OpExecError("Could not start instance %s on node %s." %
2864
                                 (instance.name, target_node))
2865

    
2866

    
2867
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2868
  """Create a tree of block devices on the primary node.
2869

2870
  This always creates all devices.
2871

2872
  """
2873
  if device.children:
2874
    for child in device.children:
2875
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2876
        return False
2877

    
2878
  cfg.SetDiskID(device, node)
2879
  new_id = rpc.call_blockdev_create(node, device, device.size,
2880
                                    instance.name, True, info)
2881
  if not new_id:
2882
    return False
2883
  if device.physical_id is None:
2884
    device.physical_id = new_id
2885
  return True
2886

    
2887

    
2888
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2889
  """Create a tree of block devices on a secondary node.
2890

2891
  If this device type has to be created on secondaries, create it and
2892
  all its children.
2893

2894
  If not, just recurse to children keeping the same 'force' value.
2895

2896
  """
2897
  if device.CreateOnSecondary():
2898
    force = True
2899
  if device.children:
2900
    for child in device.children:
2901
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2902
                                        child, force, info):
2903
        return False
2904

    
2905
  if not force:
2906
    return True
2907
  cfg.SetDiskID(device, node)
2908
  new_id = rpc.call_blockdev_create(node, device, device.size,
2909
                                    instance.name, False, info)
2910
  if not new_id:
2911
    return False
2912
  if device.physical_id is None:
2913
    device.physical_id = new_id
2914
  return True
2915

    
2916

    
2917
def _GenerateUniqueNames(cfg, exts):
2918
  """Generate a suitable LV name.
2919

2920
  This will generate a logical volume name for the given instance.
2921

2922
  """
2923
  results = []
2924
  for val in exts:
2925
    new_id = cfg.GenerateUniqueID()
2926
    results.append("%s%s" % (new_id, val))
2927
  return results
2928

    
2929

    
2930
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2931
  """Generate a drbd device complete with its children.
2932

2933
  """
2934
  port = cfg.AllocatePort()
2935
  vgname = cfg.GetVGName()
2936
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2937
                          logical_id=(vgname, names[0]))
2938
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2939
                          logical_id=(vgname, names[1]))
2940
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2941
                          logical_id = (primary, secondary, port),
2942
                          children = [dev_data, dev_meta])
2943
  return drbd_dev
2944

    
2945

    
2946
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2947
  """Generate a drbd8 device complete with its children.
2948

2949
  """
2950
  port = cfg.AllocatePort()
2951
  vgname = cfg.GetVGName()
2952
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2953
                          logical_id=(vgname, names[0]))
2954
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2955
                          logical_id=(vgname, names[1]))
2956
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2957
                          logical_id = (primary, secondary, port),
2958
                          children = [dev_data, dev_meta],
2959
                          iv_name=iv_name)
2960
  return drbd_dev
2961

    
2962

    
2963
def _GenerateDiskTemplate(cfg, template_name,
2964
                          instance_name, primary_node,
2965
                          secondary_nodes, disk_sz, swap_sz,
2966
                          file_storage_dir, file_driver):
2967
  """Generate the entire disk layout for a given template type.
2968

2969
  """
2970
  #TODO: compute space requirements
2971

    
2972
  vgname = cfg.GetVGName()
2973
  if template_name == constants.DT_DISKLESS:
2974
    disks = []
2975
  elif template_name == constants.DT_PLAIN:
2976
    if len(secondary_nodes) != 0:
2977
      raise errors.ProgrammerError("Wrong template configuration")
2978

    
2979
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2980
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2981
                           logical_id=(vgname, names[0]),
2982
                           iv_name = "sda")
2983
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2984
                           logical_id=(vgname, names[1]),
2985
                           iv_name = "sdb")
2986
    disks = [sda_dev, sdb_dev]
2987
  elif template_name == constants.DT_DRBD8:
2988
    if len(secondary_nodes) != 1:
2989
      raise errors.ProgrammerError("Wrong template configuration")
2990
    remote_node = secondary_nodes[0]
2991
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2992
                                       ".sdb_data", ".sdb_meta"])
2993
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2994
                                         disk_sz, names[0:2], "sda")
2995
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2996
                                         swap_sz, names[2:4], "sdb")
2997
    disks = [drbd_sda_dev, drbd_sdb_dev]
2998
  elif template_name == constants.DT_FILE:
2999
    if len(secondary_nodes) != 0:
3000
      raise errors.ProgrammerError("Wrong template configuration")
3001

    
3002
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3003
                                iv_name="sda", logical_id=(file_driver,
3004
                                "%s/sda" % file_storage_dir))
3005
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3006
                                iv_name="sdb", logical_id=(file_driver,
3007
                                "%s/sdb" % file_storage_dir))
3008
    disks = [file_sda_dev, file_sdb_dev]
3009
  else:
3010
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3011
  return disks
3012

    
3013

    
3014
def _GetInstanceInfoText(instance):
3015
  """Compute that text that should be added to the disk's metadata.
3016

3017
  """
3018
  return "originstname+%s" % instance.name
3019

    
3020

    
3021
def _CreateDisks(cfg, instance):
3022
  """Create all disks for an instance.
3023

3024
  This abstracts away some work from AddInstance.
3025

3026
  Args:
3027
    instance: the instance object
3028

3029
  Returns:
3030
    True or False showing the success of the creation process
3031

3032
  """
3033
  info = _GetInstanceInfoText(instance)
3034

    
3035
  if instance.disk_template == constants.DT_FILE:
3036
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3037
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3038
                                              file_storage_dir)
3039

    
3040
    if not result:
3041
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3042
      return False
3043

    
3044
    if not result[0]:
3045
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3046
      return False
3047

    
3048
  for device in instance.disks:
3049
    logger.Info("creating volume %s for instance %s" %
3050
                (device.iv_name, instance.name))
3051
    #HARDCODE
3052
    for secondary_node in instance.secondary_nodes:
3053
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3054
                                        device, False, info):
3055
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3056
                     (device.iv_name, device, secondary_node))
3057
        return False
3058
    #HARDCODE
3059
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3060
                                    instance, device, info):
3061
      logger.Error("failed to create volume %s on primary!" %
3062
                   device.iv_name)
3063
      return False
3064

    
3065
  return True
3066

    
3067

    
3068
def _RemoveDisks(instance, cfg):
3069
  """Remove all disks for an instance.
3070

3071
  This abstracts away some work from `AddInstance()` and
3072
  `RemoveInstance()`. Note that in case some of the devices couldn't
3073
  be removed, the removal will continue with the other ones (compare
3074
  with `_CreateDisks()`).
3075

3076
  Args:
3077
    instance: the instance object
3078

3079
  Returns:
3080
    True or False showing the success of the removal proces
3081

3082
  """
3083
  logger.Info("removing block devices for instance %s" % instance.name)
3084

    
3085
  result = True
3086
  for device in instance.disks:
3087
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3088
      cfg.SetDiskID(disk, node)
3089
      if not rpc.call_blockdev_remove(node, disk):
3090
        logger.Error("could not remove block device %s on node %s,"
3091
                     " continuing anyway" %
3092
                     (device.iv_name, node))
3093
        result = False
3094

    
3095
  if instance.disk_template == constants.DT_FILE:
3096
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3097
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3098
                                            file_storage_dir):
3099
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3100
      result = False
3101

    
3102
  return result
3103

    
3104

    
3105
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3106
  """Compute disk size requirements in the volume group
3107

3108
  This is currently hard-coded for the two-drive layout.
3109

3110
  """
3111
  # Required free disk space as a function of disk and swap space
3112
  req_size_dict = {
3113
    constants.DT_DISKLESS: None,
3114
    constants.DT_PLAIN: disk_size + swap_size,
3115
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3116
    constants.DT_DRBD8: disk_size + swap_size + 256,
3117
    constants.DT_FILE: None,
3118
  }
3119

    
3120
  if disk_template not in req_size_dict:
3121
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3122
                                 " is unknown" %  disk_template)
3123

    
3124
  return req_size_dict[disk_template]
3125

    
3126

    
3127
class LUCreateInstance(LogicalUnit):
3128
  """Create an instance.
3129

3130
  """
3131
  HPATH = "instance-add"
3132
  HTYPE = constants.HTYPE_INSTANCE
3133
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3134
              "disk_template", "swap_size", "mode", "start", "vcpus",
3135
              "wait_for_sync", "ip_check", "mac"]
3136

    
3137
  def _RunAllocator(self):
3138
    """Run the allocator based on input opcode.
3139

3140
    """
3141
    disks = [{"size": self.op.disk_size, "mode": "w"},
3142
             {"size": self.op.swap_size, "mode": "w"}]
3143
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3144
             "bridge": self.op.bridge}]
3145
    ial = IAllocator(self.cfg, self.sstore,
3146
                     name=self.op.instance_name,
3147
                     disk_template=self.op.disk_template,
3148
                     tags=[],
3149
                     os=self.op.os_type,
3150
                     vcpus=self.op.vcpus,
3151
                     mem_size=self.op.mem_size,
3152
                     disks=disks,
3153
                     nics=nics,
3154
                     mode=constants.IALLOCATOR_MODE_ALLOC)
3155

    
3156
    ial.Run(self.op.iallocator)
3157

    
3158
    if not ial.success:
3159
      raise errors.OpPrereqError("Can't compute nodes using"
3160
                                 " iallocator '%s': %s" % (self.op.iallocator,
3161
                                                           ial.info))
3162
    if len(ial.nodes) != ial.required_nodes:
3163
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3164
                                 " of nodes (%s), required %s" %
3165
                                 (len(ial.nodes), ial.required_nodes))
3166
    self.op.pnode = ial.nodes[0]
3167
    logger.ToStdout("Selected nodes for the instance: %s" %
3168
                    (", ".join(ial.nodes),))
3169
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3170
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3171
    if ial.required_nodes == 2:
3172
      self.op.snode = ial.nodes[1]
3173

    
3174
  def BuildHooksEnv(self):
3175
    """Build hooks env.
3176

3177
    This runs on master, primary and secondary nodes of the instance.
3178

3179
    """
3180
    env = {
3181
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3182
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3183
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3184
      "INSTANCE_ADD_MODE": self.op.mode,
3185
      }
3186
    if self.op.mode == constants.INSTANCE_IMPORT:
3187
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3188
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3189
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3190

    
3191
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3192
      primary_node=self.op.pnode,
3193
      secondary_nodes=self.secondaries,
3194
      status=self.instance_status,
3195
      os_type=self.op.os_type,
3196
      memory=self.op.mem_size,
3197
      vcpus=self.op.vcpus,
3198
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3199
    ))
3200

    
3201
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3202
          self.secondaries)
3203
    return env, nl, nl
3204

    
3205

    
3206
  def CheckPrereq(self):
3207
    """Check prerequisites.
3208

3209
    """
3210
    # set optional parameters to none if they don't exist
3211
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3212
                 "iallocator"]:
3213
      if not hasattr(self.op, attr):
3214
        setattr(self.op, attr, None)
3215

    
3216
    if self.op.mode not in (constants.INSTANCE_CREATE,
3217
                            constants.INSTANCE_IMPORT):
3218
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3219
                                 self.op.mode)
3220

    
3221
    if (not self.cfg.GetVGName() and
3222
        self.op.disk_template not in constants.DTS_NOT_LVM):
3223
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3224
                                 " instances")
3225

    
3226
    if self.op.mode == constants.INSTANCE_IMPORT:
3227
      src_node = getattr(self.op, "src_node", None)
3228
      src_path = getattr(self.op, "src_path", None)
3229
      if src_node is None or src_path is None:
3230
        raise errors.OpPrereqError("Importing an instance requires source"
3231
                                   " node and path options")
3232
      src_node_full = self.cfg.ExpandNodeName(src_node)
3233
      if src_node_full is None:
3234
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3235
      self.op.src_node = src_node = src_node_full
3236

    
3237
      if not os.path.isabs(src_path):
3238
        raise errors.OpPrereqError("The source path must be absolute")
3239

    
3240
      export_info = rpc.call_export_info(src_node, src_path)
3241

    
3242
      if not export_info:
3243
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3244

    
3245
      if not export_info.has_section(constants.INISECT_EXP):
3246
        raise errors.ProgrammerError("Corrupted export config")
3247

    
3248
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3249
      if (int(ei_version) != constants.EXPORT_VERSION):
3250
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3251
                                   (ei_version, constants.EXPORT_VERSION))
3252

    
3253
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3254
        raise errors.OpPrereqError("Can't import instance with more than"
3255
                                   " one data disk")
3256

    
3257
      # FIXME: are the old os-es, disk sizes, etc. useful?
3258
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3259
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3260
                                                         'disk0_dump'))
3261
      self.src_image = diskimage
3262
    else: # INSTANCE_CREATE
3263
      if getattr(self.op, "os_type", None) is None:
3264
        raise errors.OpPrereqError("No guest OS specified")
3265

    
3266
    #### instance parameters check
3267

    
3268
    # disk template and mirror node verification
3269
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3270
      raise errors.OpPrereqError("Invalid disk template name")
3271

    
3272
    # instance name verification
3273
    hostname1 = utils.HostInfo(self.op.instance_name)
3274

    
3275
    self.op.instance_name = instance_name = hostname1.name
3276
    instance_list = self.cfg.GetInstanceList()
3277
    if instance_name in instance_list:
3278
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3279
                                 instance_name)
3280

    
3281
    # ip validity checks
3282
    ip = getattr(self.op, "ip", None)
3283
    if ip is None or ip.lower() == "none":
3284
      inst_ip = None
3285
    elif ip.lower() == "auto":
3286
      inst_ip = hostname1.ip
3287
    else:
3288
      if not utils.IsValidIP(ip):
3289
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3290
                                   " like a valid IP" % ip)
3291
      inst_ip = ip
3292
    self.inst_ip = self.op.ip = inst_ip
3293

    
3294
    if self.op.start and not self.op.ip_check:
3295
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3296
                                 " adding an instance in start mode")
3297

    
3298
    if self.op.ip_check:
3299
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3300
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3301
                                   (hostname1.ip, instance_name))
3302

    
3303
    # MAC address verification
3304
    if self.op.mac != "auto":
3305
      if not utils.IsValidMac(self.op.mac.lower()):
3306
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3307
                                   self.op.mac)
3308

    
3309
    # bridge verification
3310
    bridge = getattr(self.op, "bridge", None)
3311
    if bridge is None:
3312
      self.op.bridge = self.cfg.GetDefBridge()
3313
    else:
3314
      self.op.bridge = bridge
3315

    
3316
    # boot order verification
3317
    if self.op.hvm_boot_order is not None:
3318
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3319
        raise errors.OpPrereqError("invalid boot order specified,"
3320
                                   " must be one or more of [acdn]")
3321
    # file storage checks
3322
    if (self.op.file_driver and
3323
        not self.op.file_driver in constants.FILE_DRIVER):
3324
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3325
                                 self.op.file_driver)
3326

    
3327
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3328
        raise errors.OpPrereqError("File storage directory not a relative"
3329
                                   " path")
3330
    #### allocator run
3331

    
3332
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3333
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3334
                                 " node must be given")
3335

    
3336
    if self.op.iallocator is not None:
3337
      self._RunAllocator()
3338

    
3339
    #### node related checks
3340

    
3341
    # check primary node
3342
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3343
    if pnode is None:
3344
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3345
                                 self.op.pnode)
3346
    self.op.pnode = pnode.name
3347
    self.pnode = pnode
3348
    self.secondaries = []
3349

    
3350
    # mirror node verification
3351
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3352
      if getattr(self.op, "snode", None) is None:
3353
        raise errors.OpPrereqError("The networked disk templates need"
3354
                                   " a mirror node")
3355

    
3356
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3357
      if snode_name is None:
3358
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3359
                                   self.op.snode)
3360
      elif snode_name == pnode.name:
3361
        raise errors.OpPrereqError("The secondary node cannot be"
3362
                                   " the primary node.")
3363
      self.secondaries.append(snode_name)
3364

    
3365
    req_size = _ComputeDiskSize(self.op.disk_template,
3366
                                self.op.disk_size, self.op.swap_size)
3367

    
3368
    # Check lv size requirements
3369
    if req_size is not None:
3370
      nodenames = [pnode.name] + self.secondaries
3371
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3372
      for node in nodenames:
3373
        info = nodeinfo.get(node, None)
3374
        if not info:
3375
          raise errors.OpPrereqError("Cannot get current information"
3376
                                     " from node '%s'" % nodeinfo)
3377
        vg_free = info.get('vg_free', None)
3378
        if not isinstance(vg_free, int):
3379
          raise errors.OpPrereqError("Can't compute free disk space on"
3380
                                     " node %s" % node)
3381
        if req_size > info['vg_free']:
3382
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3383
                                     " %d MB available, %d MB required" %
3384
                                     (node, info['vg_free'], req_size))
3385

    
3386
    # os verification
3387
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3388
    if not os_obj:
3389
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3390
                                 " primary node"  % self.op.os_type)
3391

    
3392
    if self.op.kernel_path == constants.VALUE_NONE:
3393
      raise errors.OpPrereqError("Can't set instance kernel to none")
3394

    
3395

    
3396
    # bridge check on primary node
3397
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3398
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3399
                                 " destination node '%s'" %
3400
                                 (self.op.bridge, pnode.name))
3401

    
3402
    if self.op.start:
3403
      self.instance_status = 'up'
3404
    else:
3405
      self.instance_status = 'down'
3406

    
3407
  def Exec(self, feedback_fn):
3408
    """Create and add the instance to the cluster.
3409

3410
    """
3411
    instance = self.op.instance_name
3412
    pnode_name = self.pnode.name
3413

    
3414
    if self.op.mac == "auto":
3415
      mac_address = self.cfg.GenerateMAC()
3416
    else:
3417
      mac_address = self.op.mac
3418

    
3419
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3420
    if self.inst_ip is not None:
3421
      nic.ip = self.inst_ip
3422

    
3423
    ht_kind = self.sstore.GetHypervisorType()
3424
    if ht_kind in constants.HTS_REQ_PORT:
3425
      network_port = self.cfg.AllocatePort()
3426
    else:
3427
      network_port = None
3428

    
3429
    # this is needed because os.path.join does not accept None arguments
3430
    if self.op.file_storage_dir is None:
3431
      string_file_storage_dir = ""
3432
    else:
3433
      string_file_storage_dir = self.op.file_storage_dir
3434

    
3435
    # build the full file storage dir path
3436
    file_storage_dir = os.path.normpath(os.path.join(
3437
                                        self.sstore.GetFileStorageDir(),
3438
                                        string_file_storage_dir, instance))
3439

    
3440

    
3441
    disks = _GenerateDiskTemplate(self.cfg,
3442
                                  self.op.disk_template,
3443
                                  instance, pnode_name,
3444
                                  self.secondaries, self.op.disk_size,
3445
                                  self.op.swap_size,
3446
                                  file_storage_dir,
3447
                                  self.op.file_driver)
3448

    
3449
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3450
                            primary_node=pnode_name,
3451
                            memory=self.op.mem_size,
3452
                            vcpus=self.op.vcpus,
3453
                            nics=[nic], disks=disks,
3454
                            disk_template=self.op.disk_template,
3455
                            status=self.instance_status,
3456
                            network_port=network_port,
3457
                            kernel_path=self.op.kernel_path,
3458
                            initrd_path=self.op.initrd_path,
3459
                            hvm_boot_order=self.op.hvm_boot_order,
3460
                            )
3461

    
3462
    feedback_fn("* creating instance disks...")
3463
    if not _CreateDisks(self.cfg, iobj):
3464
      _RemoveDisks(iobj, self.cfg)
3465
      raise errors.OpExecError("Device creation failed, reverting...")
3466

    
3467
    feedback_fn("adding instance %s to cluster config" % instance)
3468

    
3469
    self.cfg.AddInstance(iobj)
3470

    
3471
    if self.op.wait_for_sync:
3472
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3473
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3474
      # make sure the disks are not degraded (still sync-ing is ok)
3475
      time.sleep(15)
3476
      feedback_fn("* checking mirrors status")
3477
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3478
    else:
3479
      disk_abort = False
3480

    
3481
    if disk_abort:
3482
      _RemoveDisks(iobj, self.cfg)
3483
      self.cfg.RemoveInstance(iobj.name)
3484
      raise errors.OpExecError("There are some degraded disks for"
3485
                               " this instance")
3486

    
3487
    feedback_fn("creating os for instance %s on node %s" %
3488
                (instance, pnode_name))
3489

    
3490
    if iobj.disk_template != constants.DT_DISKLESS:
3491
      if self.op.mode == constants.INSTANCE_CREATE:
3492
        feedback_fn("* running the instance OS create scripts...")
3493
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3494
          raise errors.OpExecError("could not add os for instance %s"
3495
                                   " on node %s" %
3496
                                   (instance, pnode_name))
3497

    
3498
      elif self.op.mode == constants.INSTANCE_IMPORT:
3499
        feedback_fn("* running the instance OS import scripts...")
3500
        src_node = self.op.src_node
3501
        src_image = self.src_image
3502
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3503
                                                src_node, src_image):
3504
          raise errors.OpExecError("Could not import os for instance"
3505
                                   " %s on node %s" %
3506
                                   (instance, pnode_name))
3507
      else:
3508
        # also checked in the prereq part
3509
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3510
                                     % self.op.mode)
3511

    
3512
    if self.op.start:
3513
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3514
      feedback_fn("* starting instance...")
3515
      if not rpc.call_instance_start(pnode_name, iobj, None):
3516
        raise errors.OpExecError("Could not start instance")
3517

    
3518

    
3519
class LUConnectConsole(NoHooksLU):
3520
  """Connect to an instance's console.
3521

3522
  This is somewhat special in that it returns the command line that
3523
  you need to run on the master node in order to connect to the
3524
  console.
3525

3526
  """
3527
  _OP_REQP = ["instance_name"]
3528

    
3529
  def CheckPrereq(self):
3530
    """Check prerequisites.
3531

3532
    This checks that the instance is in the cluster.
3533

3534
    """
3535
    instance = self.cfg.GetInstanceInfo(
3536
      self.cfg.ExpandInstanceName(self.op.instance_name))
3537
    if instance is None:
3538
      raise errors.OpPrereqError("Instance '%s' not known" %
3539
                                 self.op.instance_name)
3540
    self.instance = instance
3541

    
3542
  def Exec(self, feedback_fn):
3543
    """Connect to the console of an instance
3544

3545
    """
3546
    instance = self.instance
3547
    node = instance.primary_node
3548

    
3549
    node_insts = rpc.call_instance_list([node])[node]
3550
    if node_insts is False:
3551
      raise errors.OpExecError("Can't connect to node %s." % node)
3552

    
3553
    if instance.name not in node_insts:
3554
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3555

    
3556
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3557

    
3558
    hyper = hypervisor.GetHypervisor()
3559
    console_cmd = hyper.GetShellCommandForConsole(instance)
3560

    
3561
    # build ssh cmdline
3562
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3563

    
3564

    
3565
class LUReplaceDisks(LogicalUnit):
3566
  """Replace the disks of an instance.
3567

3568
  """
3569
  HPATH = "mirrors-replace"
3570
  HTYPE = constants.HTYPE_INSTANCE
3571
  _OP_REQP = ["instance_name", "mode", "disks"]
3572

    
3573
  def BuildHooksEnv(self):
3574
    """Build hooks env.
3575

3576
    This runs on the master, the primary and all the secondaries.
3577

3578
    """
3579
    env = {
3580
      "MODE": self.op.mode,
3581
      "NEW_SECONDARY": self.op.remote_node,
3582
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3583
      }
3584
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3585
    nl = [
3586
      self.sstore.GetMasterNode(),
3587
      self.instance.primary_node,
3588
      ]
3589
    if self.op.remote_node is not None:
3590
      nl.append(self.op.remote_node)
3591
    return env, nl, nl
3592

    
3593
  def CheckPrereq(self):
3594
    """Check prerequisites.
3595

3596
    This checks that the instance is in the cluster.
3597

3598
    """
3599
    instance = self.cfg.GetInstanceInfo(
3600
      self.cfg.ExpandInstanceName(self.op.instance_name))
3601
    if instance is None:
3602
      raise errors.OpPrereqError("Instance '%s' not known" %
3603
                                 self.op.instance_name)
3604
    self.instance = instance
3605
    self.op.instance_name = instance.name
3606

    
3607
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3608
      raise errors.OpPrereqError("Instance's disk layout is not"
3609
                                 " network mirrored.")
3610

    
3611
    if len(instance.secondary_nodes) != 1:
3612
      raise errors.OpPrereqError("The instance has a strange layout,"
3613
                                 " expected one secondary but found %d" %
3614
                                 len(instance.secondary_nodes))
3615

    
3616
    self.sec_node = instance.secondary_nodes[0]
3617

    
3618
    remote_node = getattr(self.op, "remote_node", None)
3619
    if remote_node is not None:
3620
      remote_node = self.cfg.ExpandNodeName(remote_node)
3621
      if remote_node is None:
3622
        raise errors.OpPrereqError("Node '%s' not known" %
3623
                                   self.op.remote_node)
3624
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3625
    else:
3626
      self.remote_node_info = None
3627
    if remote_node == instance.primary_node:
3628
      raise errors.OpPrereqError("The specified node is the primary node of"
3629
                                 " the instance.")
3630
    elif remote_node == self.sec_node:
3631
      if self.op.mode == constants.REPLACE_DISK_SEC:
3632
        # this is for DRBD8, where we can't execute the same mode of
3633
        # replacement as for drbd7 (no different port allocated)
3634
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3635
                                   " replacement")
3636
      # the user gave the current secondary, switch to
3637
      # 'no-replace-secondary' mode for drbd7
3638
      remote_node = None
3639
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3640
        self.op.mode != constants.REPLACE_DISK_ALL):
3641
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3642
                                 " disks replacement, not individual ones")
3643
    if instance.disk_template == constants.DT_DRBD8:
3644
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3645
          remote_node is not None):
3646
        # switch to replace secondary mode
3647
        self.op.mode = constants.REPLACE_DISK_SEC
3648

    
3649
      if self.op.mode == constants.REPLACE_DISK_ALL:
3650
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3651
                                   " secondary disk replacement, not"
3652
                                   " both at once")
3653
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3654
        if remote_node is not None:
3655
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3656
                                     " the secondary while doing a primary"
3657
                                     " node disk replacement")
3658
        self.tgt_node = instance.primary_node
3659
        self.oth_node = instance.secondary_nodes[0]
3660
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3661
        self.new_node = remote_node # this can be None, in which case
3662
                                    # we don't change the secondary
3663
        self.tgt_node = instance.secondary_nodes[0]
3664
        self.oth_node = instance.primary_node
3665
      else:
3666
        raise errors.ProgrammerError("Unhandled disk replace mode")
3667

    
3668
    for name in self.op.disks:
3669
      if instance.FindDisk(name) is None:
3670
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3671
                                   (name, instance.name))
3672
    self.op.remote_node = remote_node
3673

    
3674
  def _ExecRR1(self, feedback_fn):
3675
    """Replace the disks of an instance.
3676

3677
    """
3678
    instance = self.instance
3679
    iv_names = {}
3680
    # start of work
3681
    if self.op.remote_node is None:
3682
      remote_node = self.sec_node
3683
    else:
3684
      remote_node = self.op.remote_node
3685
    cfg = self.cfg
3686
    for dev in instance.disks:
3687
      size = dev.size
3688
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3689
      names = _GenerateUniqueNames(cfg, lv_names)
3690
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3691
                                       remote_node, size, names)
3692
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3693
      logger.Info("adding new mirror component on secondary for %s" %
3694
                  dev.iv_name)
3695
      #HARDCODE
3696
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3697
                                        new_drbd, False,
3698
                                        _GetInstanceInfoText(instance)):
3699
        raise errors.OpExecError("Failed to create new component on secondary"
3700
                                 " node %s. Full abort, cleanup manually!" %
3701
                                 remote_node)
3702

    
3703
      logger.Info("adding new mirror component on primary")
3704
      #HARDCODE
3705
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3706
                                      instance, new_drbd,
3707
                                      _GetInstanceInfoText(instance)):
3708
        # remove secondary dev
3709
        cfg.SetDiskID(new_drbd, remote_node)
3710
        rpc.call_blockdev_remove(remote_node, new_drbd)
3711
        raise errors.OpExecError("Failed to create volume on primary!"
3712
                                 " Full abort, cleanup manually!!")
3713

    
3714
      # the device exists now
3715
      # call the primary node to add the mirror to md
3716
      logger.Info("adding new mirror component to md")
3717
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3718
                                           [new_drbd]):
3719
        logger.Error("Can't add mirror compoment to md!")
3720
        cfg.SetDiskID(new_drbd, remote_node)
3721
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3722
          logger.Error("Can't rollback on secondary")
3723
        cfg.SetDiskID(new_drbd, instance.primary_node)
3724
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3725
          logger.Error("Can't rollback on primary")
3726
        raise errors.OpExecError("Full abort, cleanup manually!!")
3727

    
3728
      dev.children.append(new_drbd)
3729
      cfg.AddInstance(instance)
3730

    
3731
    # this can fail as the old devices are degraded and _WaitForSync
3732
    # does a combined result over all disks, so we don't check its
3733
    # return value
3734
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3735

    
3736
    # so check manually all the devices
3737
    for name in iv_names:
3738
      dev, child, new_drbd = iv_names[name]
3739
      cfg.SetDiskID(dev, instance.primary_node)
3740
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3741
      if is_degr:
3742
        raise errors.OpExecError("MD device %s is degraded!" % name)
3743
      cfg.SetDiskID(new_drbd, instance.primary_node)
3744
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3745
      if is_degr:
3746
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3747

    
3748
    for name in iv_names:
3749
      dev, child, new_drbd = iv_names[name]
3750
      logger.Info("remove mirror %s component" % name)
3751
      cfg.SetDiskID(dev, instance.primary_node)
3752
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3753
                                              dev, [child]):
3754
        logger.Error("Can't remove child from mirror, aborting"
3755
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3756
        continue
3757

    
3758
      for node in child.logical_id[:2]:
3759
        logger.Info("remove child device on %s" % node)
3760
        cfg.SetDiskID(child, node)
3761
        if not rpc.call_blockdev_remove(node, child):
3762
          logger.Error("Warning: failed to remove device from node %s,"
3763
                       " continuing operation." % node)
3764

    
3765
      dev.children.remove(child)
3766

    
3767
      cfg.AddInstance(instance)
3768

    
3769
  def _ExecD8DiskOnly(self, feedback_fn):
3770
    """Replace a disk on the primary or secondary for dbrd8.
3771

3772
    The algorithm for replace is quite complicated:
3773
      - for each disk to be replaced:
3774
        - create new LVs on the target node with unique names
3775
        - detach old LVs from the drbd device
3776
        - rename old LVs to name_replaced.<time_t>
3777
        - rename new LVs to old LVs
3778
        - attach the new LVs (with the old names now) to the drbd device
3779
      - wait for sync across all devices
3780
      - for each modified disk:
3781
        - remove old LVs (which have the name name_replaces.<time_t>)
3782

3783
    Failures are not very well handled.
3784

3785
    """
3786
    steps_total = 6
3787
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3788
    instance = self.instance
3789
    iv_names = {}
3790
    vgname = self.cfg.GetVGName()
3791
    # start of work
3792
    cfg = self.cfg
3793
    tgt_node = self.tgt_node
3794
    oth_node = self.oth_node
3795

    
3796
    # Step: check device activation
3797
    self.proc.LogStep(1, steps_total, "check device existence")
3798
    info("checking volume groups")
3799
    my_vg = cfg.GetVGName()
3800
    results = rpc.call_vg_list([oth_node, tgt_node])
3801
    if not results:
3802
      raise errors.OpExecError("Can't list volume groups on the nodes")
3803
    for node in oth_node, tgt_node:
3804
      res = results.get(node, False)
3805
      if not res or my_vg not in res:
3806
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3807
                                 (my_vg, node))
3808
    for dev in instance.disks:
3809
      if not dev.iv_name in self.op.disks:
3810
        continue
3811
      for node in tgt_node, oth_node:
3812
        info("checking %s on %s" % (dev.iv_name, node))
3813
        cfg.SetDiskID(dev, node)
3814
        if not rpc.call_blockdev_find(node, dev):
3815
          raise errors.OpExecError("Can't find device %s on node %s" %
3816
                                   (dev.iv_name, node))
3817

    
3818
    # Step: check other node consistency
3819
    self.proc.LogStep(2, steps_total, "check peer consistency")
3820
    for dev in instance.disks:
3821
      if not dev.iv_name in self.op.disks:
3822
        continue
3823
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3824
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3825
                                   oth_node==instance.primary_node):
3826
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3827
                                 " to replace disks on this node (%s)" %
3828
                                 (oth_node, tgt_node))
3829

    
3830
    # Step: create new storage
3831
    self.proc.LogStep(3, steps_total, "allocate new storage")
3832
    for dev in instance.disks:
3833
      if not dev.iv_name in self.op.disks:
3834
        continue
3835
      size = dev.size
3836
      cfg.SetDiskID(dev, tgt_node)
3837
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3838
      names = _GenerateUniqueNames(cfg, lv_names)
3839
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3840
                             logical_id=(vgname, names[0]))
3841
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3842
                             logical_id=(vgname, names[1]))
3843
      new_lvs = [lv_data, lv_meta]
3844
      old_lvs = dev.children
3845
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3846
      info("creating new local storage on %s for %s" %
3847
           (tgt_node, dev.iv_name))
3848
      # since we *always* want to create this LV, we use the
3849
      # _Create...OnPrimary (which forces the creation), even if we
3850
      # are talking about the secondary node
3851
      for new_lv in new_lvs:
3852
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3853
                                        _GetInstanceInfoText(instance)):
3854
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3855
                                   " node '%s'" %
3856
                                   (new_lv.logical_id[1], tgt_node))
3857

    
3858
    # Step: for each lv, detach+rename*2+attach
3859
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3860
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3861
      info("detaching %s drbd from local storage" % dev.iv_name)
3862
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3863
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3864
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3865
      #dev.children = []
3866
      #cfg.Update(instance)
3867

    
3868
      # ok, we created the new LVs, so now we know we have the needed
3869
      # storage; as such, we proceed on the target node to rename
3870
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3871
      # using the assumption that logical_id == physical_id (which in
3872
      # turn is the unique_id on that node)
3873

    
3874
      # FIXME(iustin): use a better name for the replaced LVs
3875
      temp_suffix = int(time.time())
3876
      ren_fn = lambda d, suff: (d.physical_id[0],
3877
                                d.physical_id[1] + "_replaced-%s" % suff)
3878
      # build the rename list based on what LVs exist on the node
3879
      rlist = []
3880
      for to_ren in old_lvs:
3881
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3882
        if find_res is not None: # device exists
3883
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3884

    
3885
      info("renaming the old LVs on the target node")
3886
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3887
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3888
      # now we rename the new LVs to the old LVs
3889
      info("renaming the new LVs on the target node")
3890
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3891
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3892
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3893

    
3894
      for old, new in zip(old_lvs, new_lvs):
3895
        new.logical_id = old.logical_id
3896
        cfg.SetDiskID(new, tgt_node)
3897

    
3898
      for disk in old_lvs:
3899
        disk.logical_id = ren_fn(disk, temp_suffix)
3900
        cfg.SetDiskID(disk, tgt_node)
3901

    
3902
      # now that the new lvs have the old name, we can add them to the device
3903
      info("adding new mirror component on %s" % tgt_node)
3904
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3905
        for new_lv in new_lvs:
3906
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3907
            warning("Can't rollback device %s", hint="manually cleanup unused"
3908
                    " logical volumes")
3909
        raise errors.OpExecError("Can't add local storage to drbd")
3910

    
3911
      dev.children = new_lvs
3912
      cfg.Update(instance)
3913

    
3914
    # Step: wait for sync
3915

    
3916
    # this can fail as the old devices are degraded and _WaitForSync
3917
    # does a combined result over all disks, so we don't check its
3918
    # return value
3919
    self.proc.LogStep(5, steps_total, "sync devices")
3920
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3921

    
3922
    # so check manually all the devices
3923
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3924
      cfg.SetDiskID(dev, instance.primary_node)
3925
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3926
      if is_degr:
3927
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3928

    
3929
    # Step: remove old storage
3930
    self.proc.LogStep(6, steps_total, "removing old storage")
3931
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3932
      info("remove logical volumes for %s" % name)
3933
      for lv in old_lvs:
3934
        cfg.SetDiskID(lv, tgt_node)
3935
        if not rpc.call_blockdev_remove(tgt_node, lv):
3936
          warning("Can't remove old LV", hint="manually remove unused LVs")
3937
          continue
3938

    
3939
  def _ExecD8Secondary(self, feedback_fn):
3940
    """Replace the secondary node for drbd8.
3941

3942
    The algorithm for replace is quite complicated:
3943
      - for all disks of the instance:
3944
        - create new LVs on the new node with same names
3945
        - shutdown the drbd device on the old secondary
3946
        - disconnect the drbd network on the primary
3947
        - create the drbd device on the new secondary
3948
        - network attach the drbd on the primary, using an artifice:
3949
          the drbd code for Attach() will connect to the network if it
3950
          finds a device which is connected to the good local disks but
3951
          not network enabled
3952
      - wait for sync across all devices
3953
      - remove all disks from the old secondary
3954

3955
    Failures are not very well handled.
3956

3957
    """
3958
    steps_total = 6
3959
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3960
    instance = self.instance
3961
    iv_names = {}
3962
    vgname = self.cfg.GetVGName()
3963
    # start of work
3964
    cfg = self.cfg
3965
    old_node = self.tgt_node
3966
    new_node = self.new_node
3967
    pri_node = instance.primary_node
3968

    
3969
    # Step: check device activation
3970
    self.proc.LogStep(1, steps_total, "check device existence")
3971
    info("checking volume groups")
3972
    my_vg = cfg.GetVGName()
3973
    results = rpc.call_vg_list([pri_node, new_node])
3974
    if not results:
3975
      raise errors.OpExecError("Can't list volume groups on the nodes")
3976
    for node in pri_node, new_node:
3977
      res = results.get(node, False)
3978
      if not res or my_vg not in res:
3979
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3980
                                 (my_vg, node))
3981
    for dev in instance.disks:
3982
      if not dev.iv_name in self.op.disks:
3983
        continue
3984
      info("checking %s on %s" % (dev.iv_name, pri_node))
3985
      cfg.SetDiskID(dev, pri_node)
3986
      if not rpc.call_blockdev_find(pri_node, dev):
3987
        raise errors.OpExecError("Can't find device %s on node %s" %
3988
                                 (dev.iv_name, pri_node))
3989

    
3990
    # Step: check other node consistency
3991
    self.proc.LogStep(2, steps_total, "check peer consistency")
3992
    for dev in instance.disks:
3993