Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 49ce1563

History | View | Annotate | Download (176.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
157
    """Notify the LU about the results of its hooks.
158

159
    This method is called every time a hooks phase is executed, and notifies
160
    the Logical Unit about the hooks' result. The LU can then use it to alter
161
    its result based on the hooks.  By default the method does nothing and the
162
    previous result is passed back unchanged but any LU can define it if it
163
    wants to use the local cluster hook-scripts somehow.
164

165
    Args:
166
      phase: the hooks phase that has just been run
167
      hooks_results: the results of the multi-node hooks rpc call
168
      feedback_fn: function to send feedback back to the caller
169
      lu_result: the previous result this LU had, or None in the PRE phase.
170

171
    """
172
    return lu_result
173

    
174

    
175
class NoHooksLU(LogicalUnit):
176
  """Simple LU which runs no hooks.
177

178
  This LU is intended as a parent for other LogicalUnits which will
179
  run no hooks, in order to reduce duplicate code.
180

181
  """
182
  HPATH = None
183
  HTYPE = None
184

    
185

    
186
def _AddHostToEtcHosts(hostname):
187
  """Wrapper around utils.SetEtcHostsEntry.
188

189
  """
190
  hi = utils.HostInfo(name=hostname)
191
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
192

    
193

    
194
def _RemoveHostFromEtcHosts(hostname):
195
  """Wrapper around utils.RemoveEtcHostsEntry.
196

197
  """
198
  hi = utils.HostInfo(name=hostname)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
200
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
201

    
202

    
203
def _GetWantedNodes(lu, nodes):
204
  """Returns list of checked and expanded node names.
205

206
  Args:
207
    nodes: List of nodes (strings) or None for all
208

209
  """
210
  if not isinstance(nodes, list):
211
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
212

    
213
  if nodes:
214
    wanted = []
215

    
216
    for name in nodes:
217
      node = lu.cfg.ExpandNodeName(name)
218
      if node is None:
219
        raise errors.OpPrereqError("No such node name '%s'" % name)
220
      wanted.append(node)
221

    
222
  else:
223
    wanted = lu.cfg.GetNodeList()
224
  return utils.NiceSort(wanted)
225

    
226

    
227
def _GetWantedInstances(lu, instances):
228
  """Returns list of checked and expanded instance names.
229

230
  Args:
231
    instances: List of instances (strings) or None for all
232

233
  """
234
  if not isinstance(instances, list):
235
    raise errors.OpPrereqError("Invalid argument type 'instances'")
236

    
237
  if instances:
238
    wanted = []
239

    
240
    for name in instances:
241
      instance = lu.cfg.ExpandInstanceName(name)
242
      if instance is None:
243
        raise errors.OpPrereqError("No such instance name '%s'" % name)
244
      wanted.append(instance)
245

    
246
  else:
247
    wanted = lu.cfg.GetInstanceList()
248
  return utils.NiceSort(wanted)
249

    
250

    
251
def _CheckOutputFields(static, dynamic, selected):
252
  """Checks whether all selected fields are valid.
253

254
  Args:
255
    static: Static fields
256
    dynamic: Dynamic fields
257

258
  """
259
  static_fields = frozenset(static)
260
  dynamic_fields = frozenset(dynamic)
261

    
262
  all_fields = static_fields | dynamic_fields
263

    
264
  if not all_fields.issuperset(selected):
265
    raise errors.OpPrereqError("Unknown output fields selected: %s"
266
                               % ",".join(frozenset(selected).
267
                                          difference(all_fields)))
268

    
269

    
270
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
271
                          memory, vcpus, nics):
272
  """Builds instance related env variables for hooks from single variables.
273

274
  Args:
275
    secondary_nodes: List of secondary nodes as strings
276
  """
277
  env = {
278
    "OP_TARGET": name,
279
    "INSTANCE_NAME": name,
280
    "INSTANCE_PRIMARY": primary_node,
281
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
282
    "INSTANCE_OS_TYPE": os_type,
283
    "INSTANCE_STATUS": status,
284
    "INSTANCE_MEMORY": memory,
285
    "INSTANCE_VCPUS": vcpus,
286
  }
287

    
288
  if nics:
289
    nic_count = len(nics)
290
    for idx, (ip, bridge, mac) in enumerate(nics):
291
      if ip is None:
292
        ip = ""
293
      env["INSTANCE_NIC%d_IP" % idx] = ip
294
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
295
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
296
  else:
297
    nic_count = 0
298

    
299
  env["INSTANCE_NIC_COUNT"] = nic_count
300

    
301
  return env
302

    
303

    
304
def _BuildInstanceHookEnvByObject(instance, override=None):
305
  """Builds instance related env variables for hooks from an object.
306

307
  Args:
308
    instance: objects.Instance object of instance
309
    override: dict of values to override
310
  """
311
  args = {
312
    'name': instance.name,
313
    'primary_node': instance.primary_node,
314
    'secondary_nodes': instance.secondary_nodes,
315
    'os_type': instance.os,
316
    'status': instance.os,
317
    'memory': instance.memory,
318
    'vcpus': instance.vcpus,
319
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
320
  }
321
  if override:
322
    args.update(override)
323
  return _BuildInstanceHookEnv(**args)
324

    
325

    
326
def _HasValidVG(vglist, vgname):
327
  """Checks if the volume group list is valid.
328

329
  A non-None return value means there's an error, and the return value
330
  is the error message.
331

332
  """
333
  vgsize = vglist.get(vgname, None)
334
  if vgsize is None:
335
    return "volume group '%s' missing" % vgname
336
  elif vgsize < 20480:
337
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
338
            (vgname, vgsize))
339
  return None
340

    
341

    
342
def _InitSSHSetup(node):
343
  """Setup the SSH configuration for the cluster.
344

345

346
  This generates a dsa keypair for root, adds the pub key to the
347
  permitted hosts and adds the hostkey to its own known hosts.
348

349
  Args:
350
    node: the name of this host as a fqdn
351

352
  """
353
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
354

    
355
  for name in priv_key, pub_key:
356
    if os.path.exists(name):
357
      utils.CreateBackup(name)
358
    utils.RemoveFile(name)
359

    
360
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
361
                         "-f", priv_key,
362
                         "-q", "-N", ""])
363
  if result.failed:
364
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
365
                             result.output)
366

    
367
  f = open(pub_key, 'r')
368
  try:
369
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
370
  finally:
371
    f.close()
372

    
373

    
374
def _InitGanetiServerSetup(ss):
375
  """Setup the necessary configuration for the initial node daemon.
376

377
  This creates the nodepass file containing the shared password for
378
  the cluster and also generates the SSL certificate.
379

380
  """
381
  # Create pseudo random password
382
  randpass = sha.new(os.urandom(64)).hexdigest()
383
  # and write it into sstore
384
  ss.SetKey(ss.SS_NODED_PASS, randpass)
385

    
386
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
387
                         "-days", str(365*5), "-nodes", "-x509",
388
                         "-keyout", constants.SSL_CERT_FILE,
389
                         "-out", constants.SSL_CERT_FILE, "-batch"])
390
  if result.failed:
391
    raise errors.OpExecError("could not generate server ssl cert, command"
392
                             " %s had exitcode %s and error message %s" %
393
                             (result.cmd, result.exit_code, result.output))
394

    
395
  os.chmod(constants.SSL_CERT_FILE, 0400)
396

    
397
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
398

    
399
  if result.failed:
400
    raise errors.OpExecError("Could not start the node daemon, command %s"
401
                             " had exitcode %s and error %s" %
402
                             (result.cmd, result.exit_code, result.output))
403

    
404

    
405
def _CheckInstanceBridgesExist(instance):
406
  """Check that the brigdes needed by an instance exist.
407

408
  """
409
  # check bridges existance
410
  brlist = [nic.bridge for nic in instance.nics]
411
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
412
    raise errors.OpPrereqError("one or more target bridges %s does not"
413
                               " exist on destination node '%s'" %
414
                               (brlist, instance.primary_node))
415

    
416

    
417
class LUInitCluster(LogicalUnit):
418
  """Initialise the cluster.
419

420
  """
421
  HPATH = "cluster-init"
422
  HTYPE = constants.HTYPE_CLUSTER
423
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
424
              "def_bridge", "master_netdev", "file_storage_dir"]
425
  REQ_CLUSTER = False
426

    
427
  def BuildHooksEnv(self):
428
    """Build hooks env.
429

430
    Notes: Since we don't require a cluster, we must manually add
431
    ourselves in the post-run node list.
432

433
    """
434
    env = {"OP_TARGET": self.op.cluster_name}
435
    return env, [], [self.hostname.name]
436

    
437
  def CheckPrereq(self):
438
    """Verify that the passed name is a valid one.
439

440
    """
441
    if config.ConfigWriter.IsCluster():
442
      raise errors.OpPrereqError("Cluster is already initialised")
443

    
444
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
445
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
446
        raise errors.OpPrereqError("Please prepare the cluster VNC"
447
                                   "password file %s" %
448
                                   constants.VNC_PASSWORD_FILE)
449

    
450
    self.hostname = hostname = utils.HostInfo()
451

    
452
    if hostname.ip.startswith("127."):
453
      raise errors.OpPrereqError("This host's IP resolves to the private"
454
                                 " range (%s). Please fix DNS or %s." %
455
                                 (hostname.ip, constants.ETC_HOSTS))
456

    
457
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
458
                         source=constants.LOCALHOST_IP_ADDRESS):
459
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
460
                                 " to %s,\nbut this ip address does not"
461
                                 " belong to this host."
462
                                 " Aborting." % hostname.ip)
463

    
464
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
465

    
466
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
467
                     timeout=5):
468
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
469

    
470
    secondary_ip = getattr(self.op, "secondary_ip", None)
471
    if secondary_ip and not utils.IsValidIP(secondary_ip):
472
      raise errors.OpPrereqError("Invalid secondary ip given")
473
    if (secondary_ip and
474
        secondary_ip != hostname.ip and
475
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
476
                           source=constants.LOCALHOST_IP_ADDRESS))):
477
      raise errors.OpPrereqError("You gave %s as secondary IP,"
478
                                 " but it does not belong to this host." %
479
                                 secondary_ip)
480
    self.secondary_ip = secondary_ip
481

    
482
    if not hasattr(self.op, "vg_name"):
483
      self.op.vg_name = None
484
    # if vg_name not None, checks if volume group is valid
485
    if self.op.vg_name:
486
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
487
      if vgstatus:
488
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
489
                                   " you are not using lvm" % vgstatus)
490

    
491
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
492

    
493
    if not os.path.isabs(self.op.file_storage_dir):
494
      raise errors.OpPrereqError("The file storage directory you have is"
495
                                 " not an absolute path.")
496

    
497
    if not os.path.exists(self.op.file_storage_dir):
498
      try:
499
        os.makedirs(self.op.file_storage_dir, 0750)
500
      except OSError, err:
501
        raise errors.OpPrereqError("Cannot create file storage directory"
502
                                   " '%s': %s" %
503
                                   (self.op.file_storage_dir, err))
504

    
505
    if not os.path.isdir(self.op.file_storage_dir):
506
      raise errors.OpPrereqError("The file storage directory '%s' is not"
507
                                 " a directory." % self.op.file_storage_dir)
508

    
509
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
510
                    self.op.mac_prefix):
511
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
512
                                 self.op.mac_prefix)
513

    
514
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
515
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
516
                                 self.op.hypervisor_type)
517

    
518
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
519
    if result.failed:
520
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
521
                                 (self.op.master_netdev,
522
                                  result.output.strip()))
523

    
524
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
525
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
526
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
527
                                 " executable." % constants.NODE_INITD_SCRIPT)
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    self.sstore = ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
540
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
543
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
544
    ss.SetKey(ss.SS_CONFIG_VERSION, constants.CONFIG_VERSION)
545

    
546
    # set up the inter-node password and certificate
547
    _InitGanetiServerSetup(ss)
548

    
549
    # start the master ip
550
    rpc.call_node_start_master(hostname.name)
551

    
552
    # set up ssh config and /etc/hosts
553
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
554
    try:
555
      sshline = f.read()
556
    finally:
557
      f.close()
558
    sshkey = sshline.split(" ")[1]
559

    
560
    _AddHostToEtcHosts(hostname.name)
561
    _InitSSHSetup(hostname.name)
562

    
563
    # init of cluster config file
564
    self.cfg = cfgw = config.ConfigWriter()
565
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
566
                    sshkey, self.op.mac_prefix,
567
                    self.op.vg_name, self.op.def_bridge)
568

    
569
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
570

    
571

    
572
class LUDestroyCluster(NoHooksLU):
573
  """Logical unit for destroying the cluster.
574

575
  """
576
  _OP_REQP = []
577

    
578
  def CheckPrereq(self):
579
    """Check prerequisites.
580

581
    This checks whether the cluster is empty.
582

583
    Any errors are signalled by raising errors.OpPrereqError.
584

585
    """
586
    master = self.sstore.GetMasterNode()
587

    
588
    nodelist = self.cfg.GetNodeList()
589
    if len(nodelist) != 1 or nodelist[0] != master:
590
      raise errors.OpPrereqError("There are still %d node(s) in"
591
                                 " this cluster." % (len(nodelist) - 1))
592
    instancelist = self.cfg.GetInstanceList()
593
    if instancelist:
594
      raise errors.OpPrereqError("There are still %d instance(s) in"
595
                                 " this cluster." % len(instancelist))
596

    
597
  def Exec(self, feedback_fn):
598
    """Destroys the cluster.
599

600
    """
601
    master = self.sstore.GetMasterNode()
602
    if not rpc.call_node_stop_master(master):
603
      raise errors.OpExecError("Could not disable the master role")
604
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
605
    utils.CreateBackup(priv_key)
606
    utils.CreateBackup(pub_key)
607
    rpc.call_node_leave_cluster(master)
608

    
609

    
610
class LUVerifyCluster(LogicalUnit):
611
  """Verifies the cluster status.
612

613
  """
614
  HPATH = "cluster-verify"
615
  HTYPE = constants.HTYPE_CLUSTER
616
  _OP_REQP = ["skip_checks"]
617

    
618
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
619
                  remote_version, feedback_fn):
620
    """Run multiple tests against a node.
621

622
    Test list:
623
      - compares ganeti version
624
      - checks vg existance and size > 20G
625
      - checks config file checksum
626
      - checks ssh to other nodes
627

628
    Args:
629
      node: name of the node to check
630
      file_list: required list of files
631
      local_cksum: dictionary of local files and their checksums
632

633
    """
634
    # compares ganeti version
635
    local_version = constants.PROTOCOL_VERSION
636
    if not remote_version:
637
      feedback_fn("  - ERROR: connection to %s failed" % (node))
638
      return True
639

    
640
    if local_version != remote_version:
641
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
642
                      (local_version, node, remote_version))
643
      return True
644

    
645
    # checks vg existance and size > 20G
646

    
647
    bad = False
648
    if not vglist:
649
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
650
                      (node,))
651
      bad = True
652
    else:
653
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
654
      if vgstatus:
655
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
656
        bad = True
657

    
658
    # checks config file checksum
659
    # checks ssh to any
660

    
661
    if 'filelist' not in node_result:
662
      bad = True
663
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
664
    else:
665
      remote_cksum = node_result['filelist']
666
      for file_name in file_list:
667
        if file_name not in remote_cksum:
668
          bad = True
669
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
670
        elif remote_cksum[file_name] != local_cksum[file_name]:
671
          bad = True
672
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
673

    
674
    if 'nodelist' not in node_result:
675
      bad = True
676
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
677
    else:
678
      if node_result['nodelist']:
679
        bad = True
680
        for node in node_result['nodelist']:
681
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
682
                          (node, node_result['nodelist'][node]))
683
    if 'node-net-test' not in node_result:
684
      bad = True
685
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
686
    else:
687
      if node_result['node-net-test']:
688
        bad = True
689
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
690
        for node in nlist:
691
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
692
                          (node, node_result['node-net-test'][node]))
693

    
694
    hyp_result = node_result.get('hypervisor', None)
695
    if hyp_result is not None:
696
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
697
    return bad
698

    
699
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
700
                      node_instance, feedback_fn):
701
    """Verify an instance.
702

703
    This function checks to see if the required block devices are
704
    available on the instance's node.
705

706
    """
707
    bad = False
708

    
709
    node_current = instanceconfig.primary_node
710

    
711
    node_vol_should = {}
712
    instanceconfig.MapLVsByNode(node_vol_should)
713

    
714
    for node in node_vol_should:
715
      for volume in node_vol_should[node]:
716
        if node not in node_vol_is or volume not in node_vol_is[node]:
717
          feedback_fn("  - ERROR: volume %s missing on node %s" %
718
                          (volume, node))
719
          bad = True
720

    
721
    if not instanceconfig.status == 'down':
722
      if (node_current not in node_instance or
723
          not instance in node_instance[node_current]):
724
        feedback_fn("  - ERROR: instance %s not running on node %s" %
725
                        (instance, node_current))
726
        bad = True
727

    
728
    for node in node_instance:
729
      if (not node == node_current):
730
        if instance in node_instance[node]:
731
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
732
                          (instance, node))
733
          bad = True
734

    
735
    return bad
736

    
737
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
738
    """Verify if there are any unknown volumes in the cluster.
739

740
    The .os, .swap and backup volumes are ignored. All other volumes are
741
    reported as unknown.
742

743
    """
744
    bad = False
745

    
746
    for node in node_vol_is:
747
      for volume in node_vol_is[node]:
748
        if node not in node_vol_should or volume not in node_vol_should[node]:
749
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
750
                      (volume, node))
751
          bad = True
752
    return bad
753

    
754
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
755
    """Verify the list of running instances.
756

757
    This checks what instances are running but unknown to the cluster.
758

759
    """
760
    bad = False
761
    for node in node_instance:
762
      for runninginstance in node_instance[node]:
763
        if runninginstance not in instancelist:
764
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
765
                          (runninginstance, node))
766
          bad = True
767
    return bad
768

    
769
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
770
    """Verify N+1 Memory Resilience.
771

772
    Check that if one single node dies we can still start all the instances it
773
    was primary for.
774

775
    """
776
    bad = False
777

    
778
    for node, nodeinfo in node_info.iteritems():
779
      # This code checks that every node which is now listed as secondary has
780
      # enough memory to host all instances it is supposed to should a single
781
      # other node in the cluster fail.
782
      # FIXME: not ready for failover to an arbitrary node
783
      # FIXME: does not support file-backed instances
784
      # WARNING: we currently take into account down instances as well as up
785
      # ones, considering that even if they're down someone might want to start
786
      # them even in the event of a node failure.
787
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
788
        needed_mem = 0
789
        for instance in instances:
790
          needed_mem += instance_cfg[instance].memory
791
        if nodeinfo['mfree'] < needed_mem:
792
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
793
                      " failovers should node %s fail" % (node, prinode))
794
          bad = True
795
    return bad
796

    
797
  def CheckPrereq(self):
798
    """Check prerequisites.
799

800
    Transform the list of checks we're going to skip into a set and check that
801
    all its members are valid.
802

803
    """
804
    self.skip_set = frozenset(self.op.skip_checks)
805
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
806
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
807

    
808
  def BuildHooksEnv(self):
809
    """Build hooks env.
810

811
    Cluster-Verify hooks just rone in the post phase and their failure makes
812
    the output be logged in the verify output and the verification to fail.
813

814
    """
815
    all_nodes = self.cfg.GetNodeList()
816
    # TODO: populate the environment with useful information for verify hooks
817
    env = {}
818
    return env, [], all_nodes
819

    
820
  def Exec(self, feedback_fn):
821
    """Verify integrity of cluster, performing various test on nodes.
822

823
    """
824
    bad = False
825
    feedback_fn("* Verifying global settings")
826
    for msg in self.cfg.VerifyConfig():
827
      feedback_fn("  - ERROR: %s" % msg)
828

    
829
    vg_name = self.cfg.GetVGName()
830
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
831
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
832
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
833
    i_non_redundant = [] # Non redundant instances
834
    node_volume = {}
835
    node_instance = {}
836
    node_info = {}
837
    instance_cfg = {}
838

    
839
    # FIXME: verify OS list
840
    # do local checksums
841
    file_names = list(self.sstore.GetFileList())
842
    file_names.append(constants.SSL_CERT_FILE)
843
    file_names.append(constants.CLUSTER_CONF_FILE)
844
    local_checksums = utils.FingerprintFiles(file_names)
845

    
846
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
847
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
848
    all_instanceinfo = rpc.call_instance_list(nodelist)
849
    all_vglist = rpc.call_vg_list(nodelist)
850
    node_verify_param = {
851
      'filelist': file_names,
852
      'nodelist': nodelist,
853
      'hypervisor': None,
854
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
855
                        for node in nodeinfo]
856
      }
857
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
858
    all_rversion = rpc.call_version(nodelist)
859
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
860

    
861
    for node in nodelist:
862
      feedback_fn("* Verifying node %s" % node)
863
      result = self._VerifyNode(node, file_names, local_checksums,
864
                                all_vglist[node], all_nvinfo[node],
865
                                all_rversion[node], feedback_fn)
866
      bad = bad or result
867

    
868
      # node_volume
869
      volumeinfo = all_volumeinfo[node]
870

    
871
      if isinstance(volumeinfo, basestring):
872
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
873
                    (node, volumeinfo[-400:].encode('string_escape')))
874
        bad = True
875
        node_volume[node] = {}
876
      elif not isinstance(volumeinfo, dict):
877
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
878
        bad = True
879
        continue
880
      else:
881
        node_volume[node] = volumeinfo
882

    
883
      # node_instance
884
      nodeinstance = all_instanceinfo[node]
885
      if type(nodeinstance) != list:
886
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
887
        bad = True
888
        continue
889

    
890
      node_instance[node] = nodeinstance
891

    
892
      # node_info
893
      nodeinfo = all_ninfo[node]
894
      if not isinstance(nodeinfo, dict):
895
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
896
        bad = True
897
        continue
898

    
899
      try:
900
        node_info[node] = {
901
          "mfree": int(nodeinfo['memory_free']),
902
          "dfree": int(nodeinfo['vg_free']),
903
          "pinst": [],
904
          "sinst": [],
905
          # dictionary holding all instances this node is secondary for,
906
          # grouped by their primary node. Each key is a cluster node, and each
907
          # value is a list of instances which have the key as primary and the
908
          # current node as secondary.  this is handy to calculate N+1 memory
909
          # availability if you can only failover from a primary to its
910
          # secondary.
911
          "sinst-by-pnode": {},
912
        }
913
      except ValueError:
914
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
915
        bad = True
916
        continue
917

    
918
    node_vol_should = {}
919

    
920
    for instance in instancelist:
921
      feedback_fn("* Verifying instance %s" % instance)
922
      inst_config = self.cfg.GetInstanceInfo(instance)
923
      result =  self._VerifyInstance(instance, inst_config, node_volume,
924
                                     node_instance, feedback_fn)
925
      bad = bad or result
926

    
927
      inst_config.MapLVsByNode(node_vol_should)
928

    
929
      instance_cfg[instance] = inst_config
930

    
931
      pnode = inst_config.primary_node
932
      if pnode in node_info:
933
        node_info[pnode]['pinst'].append(instance)
934
      else:
935
        feedback_fn("  - ERROR: instance %s, connection to primary node"
936
                    " %s failed" % (instance, pnode))
937
        bad = True
938

    
939
      # If the instance is non-redundant we cannot survive losing its primary
940
      # node, so we are not N+1 compliant. On the other hand we have no disk
941
      # templates with more than one secondary so that situation is not well
942
      # supported either.
943
      # FIXME: does not support file-backed instances
944
      if len(inst_config.secondary_nodes) == 0:
945
        i_non_redundant.append(instance)
946
      elif len(inst_config.secondary_nodes) > 1:
947
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
948
                    % instance)
949

    
950
      for snode in inst_config.secondary_nodes:
951
        if snode in node_info:
952
          node_info[snode]['sinst'].append(instance)
953
          if pnode not in node_info[snode]['sinst-by-pnode']:
954
            node_info[snode]['sinst-by-pnode'][pnode] = []
955
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
956
        else:
957
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
958
                      " %s failed" % (instance, snode))
959

    
960
    feedback_fn("* Verifying orphan volumes")
961
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
962
                                       feedback_fn)
963
    bad = bad or result
964

    
965
    feedback_fn("* Verifying remaining instances")
966
    result = self._VerifyOrphanInstances(instancelist, node_instance,
967
                                         feedback_fn)
968
    bad = bad or result
969

    
970
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
971
      feedback_fn("* Verifying N+1 Memory redundancy")
972
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
973
      bad = bad or result
974

    
975
    feedback_fn("* Other Notes")
976
    if i_non_redundant:
977
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
978
                  % len(i_non_redundant))
979

    
980
    return int(bad)
981

    
982
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
983
    """Analize the post-hooks' result, handle it, and send some
984
    nicely-formatted feedback back to the user.
985

986
    Args:
987
      phase: the hooks phase that has just been run
988
      hooks_results: the results of the multi-node hooks rpc call
989
      feedback_fn: function to send feedback back to the caller
990
      lu_result: previous Exec result
991

992
    """
993
    # We only really run POST phase hooks, and are only interested in their results
994
    if phase == constants.HOOKS_PHASE_POST:
995
      # Used to change hooks' output to proper indentation
996
      indent_re = re.compile('^', re.M)
997
      feedback_fn("* Hooks Results")
998
      if not hooks_results:
999
        feedback_fn("  - ERROR: general communication failure")
1000
        lu_result = 1
1001
      else:
1002
        for node_name in hooks_results:
1003
          show_node_header = True
1004
          res = hooks_results[node_name]
1005
          if res is False or not isinstance(res, list):
1006
            feedback_fn("    Communication failure")
1007
            lu_result = 1
1008
            continue
1009
          for script, hkr, output in res:
1010
            if hkr == constants.HKR_FAIL:
1011
              # The node header is only shown once, if there are
1012
              # failing hooks on that node
1013
              if show_node_header:
1014
                feedback_fn("  Node %s:" % node_name)
1015
                show_node_header = False
1016
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1017
              output = indent_re.sub('      ', output)
1018
              feedback_fn("%s" % output)
1019
              lu_result = 1
1020

    
1021
      return lu_result
1022

    
1023

    
1024
class LUVerifyDisks(NoHooksLU):
1025
  """Verifies the cluster disks status.
1026

1027
  """
1028
  _OP_REQP = []
1029

    
1030
  def CheckPrereq(self):
1031
    """Check prerequisites.
1032

1033
    This has no prerequisites.
1034

1035
    """
1036
    pass
1037

    
1038
  def Exec(self, feedback_fn):
1039
    """Verify integrity of cluster disks.
1040

1041
    """
1042
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1043

    
1044
    vg_name = self.cfg.GetVGName()
1045
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1046
    instances = [self.cfg.GetInstanceInfo(name)
1047
                 for name in self.cfg.GetInstanceList()]
1048

    
1049
    nv_dict = {}
1050
    for inst in instances:
1051
      inst_lvs = {}
1052
      if (inst.status != "up" or
1053
          inst.disk_template not in constants.DTS_NET_MIRROR):
1054
        continue
1055
      inst.MapLVsByNode(inst_lvs)
1056
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1057
      for node, vol_list in inst_lvs.iteritems():
1058
        for vol in vol_list:
1059
          nv_dict[(node, vol)] = inst
1060

    
1061
    if not nv_dict:
1062
      return result
1063

    
1064
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1065

    
1066
    to_act = set()
1067
    for node in nodes:
1068
      # node_volume
1069
      lvs = node_lvs[node]
1070

    
1071
      if isinstance(lvs, basestring):
1072
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1073
        res_nlvm[node] = lvs
1074
      elif not isinstance(lvs, dict):
1075
        logger.Info("connection to node %s failed or invalid data returned" %
1076
                    (node,))
1077
        res_nodes.append(node)
1078
        continue
1079

    
1080
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1081
        inst = nv_dict.pop((node, lv_name), None)
1082
        if (not lv_online and inst is not None
1083
            and inst.name not in res_instances):
1084
          res_instances.append(inst.name)
1085

    
1086
    # any leftover items in nv_dict are missing LVs, let's arrange the
1087
    # data better
1088
    for key, inst in nv_dict.iteritems():
1089
      if inst.name not in res_missing:
1090
        res_missing[inst.name] = []
1091
      res_missing[inst.name].append(key)
1092

    
1093
    return result
1094

    
1095

    
1096
class LURenameCluster(LogicalUnit):
1097
  """Rename the cluster.
1098

1099
  """
1100
  HPATH = "cluster-rename"
1101
  HTYPE = constants.HTYPE_CLUSTER
1102
  _OP_REQP = ["name"]
1103

    
1104
  def BuildHooksEnv(self):
1105
    """Build hooks env.
1106

1107
    """
1108
    env = {
1109
      "OP_TARGET": self.sstore.GetClusterName(),
1110
      "NEW_NAME": self.op.name,
1111
      }
1112
    mn = self.sstore.GetMasterNode()
1113
    return env, [mn], [mn]
1114

    
1115
  def CheckPrereq(self):
1116
    """Verify that the passed name is a valid one.
1117

1118
    """
1119
    hostname = utils.HostInfo(self.op.name)
1120

    
1121
    new_name = hostname.name
1122
    self.ip = new_ip = hostname.ip
1123
    old_name = self.sstore.GetClusterName()
1124
    old_ip = self.sstore.GetMasterIP()
1125
    if new_name == old_name and new_ip == old_ip:
1126
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1127
                                 " cluster has changed")
1128
    if new_ip != old_ip:
1129
      result = utils.RunCmd(["fping", "-q", new_ip])
1130
      if not result.failed:
1131
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1132
                                   " reachable on the network. Aborting." %
1133
                                   new_ip)
1134

    
1135
    self.op.name = new_name
1136

    
1137
  def Exec(self, feedback_fn):
1138
    """Rename the cluster.
1139

1140
    """
1141
    clustername = self.op.name
1142
    ip = self.ip
1143
    ss = self.sstore
1144

    
1145
    # shutdown the master IP
1146
    master = ss.GetMasterNode()
1147
    if not rpc.call_node_stop_master(master):
1148
      raise errors.OpExecError("Could not disable the master role")
1149

    
1150
    try:
1151
      # modify the sstore
1152
      ss.SetKey(ss.SS_MASTER_IP, ip)
1153
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1154

    
1155
      # Distribute updated ss config to all nodes
1156
      myself = self.cfg.GetNodeInfo(master)
1157
      dist_nodes = self.cfg.GetNodeList()
1158
      if myself.name in dist_nodes:
1159
        dist_nodes.remove(myself.name)
1160

    
1161
      logger.Debug("Copying updated ssconf data to all nodes")
1162
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1163
        fname = ss.KeyToFilename(keyname)
1164
        result = rpc.call_upload_file(dist_nodes, fname)
1165
        for to_node in dist_nodes:
1166
          if not result[to_node]:
1167
            logger.Error("copy of file %s to node %s failed" %
1168
                         (fname, to_node))
1169
    finally:
1170
      if not rpc.call_node_start_master(master):
1171
        logger.Error("Could not re-enable the master role on the master,"
1172
                     " please restart manually.")
1173

    
1174

    
1175
def _RecursiveCheckIfLVMBased(disk):
1176
  """Check if the given disk or its children are lvm-based.
1177

1178
  Args:
1179
    disk: ganeti.objects.Disk object
1180

1181
  Returns:
1182
    boolean indicating whether a LD_LV dev_type was found or not
1183

1184
  """
1185
  if disk.children:
1186
    for chdisk in disk.children:
1187
      if _RecursiveCheckIfLVMBased(chdisk):
1188
        return True
1189
  return disk.dev_type == constants.LD_LV
1190

    
1191

    
1192
class LUSetClusterParams(LogicalUnit):
1193
  """Change the parameters of the cluster.
1194

1195
  """
1196
  HPATH = "cluster-modify"
1197
  HTYPE = constants.HTYPE_CLUSTER
1198
  _OP_REQP = []
1199

    
1200
  def BuildHooksEnv(self):
1201
    """Build hooks env.
1202

1203
    """
1204
    env = {
1205
      "OP_TARGET": self.sstore.GetClusterName(),
1206
      "NEW_VG_NAME": self.op.vg_name,
1207
      }
1208
    mn = self.sstore.GetMasterNode()
1209
    return env, [mn], [mn]
1210

    
1211
  def CheckPrereq(self):
1212
    """Check prerequisites.
1213

1214
    This checks whether the given params don't conflict and
1215
    if the given volume group is valid.
1216

1217
    """
1218
    if not self.op.vg_name:
1219
      instances = [self.cfg.GetInstanceInfo(name)
1220
                   for name in self.cfg.GetInstanceList()]
1221
      for inst in instances:
1222
        for disk in inst.disks:
1223
          if _RecursiveCheckIfLVMBased(disk):
1224
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1225
                                       " lvm-based instances exist")
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      node_list = self.cfg.GetNodeList()
1230
      vglist = rpc.call_vg_list(node_list)
1231
      for node in node_list:
1232
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
  def Exec(self, feedback_fn):
1238
    """Change the parameters of the cluster.
1239

1240
    """
1241
    if self.op.vg_name != self.cfg.GetVGName():
1242
      self.cfg.SetVGName(self.op.vg_name)
1243
    else:
1244
      feedback_fn("Cluster LVM configuration already in desired"
1245
                  " state, not changing")
1246

    
1247

    
1248
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1249
  """Sleep and poll for an instance's disk to sync.
1250

1251
  """
1252
  if not instance.disks:
1253
    return True
1254

    
1255
  if not oneshot:
1256
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1257

    
1258
  node = instance.primary_node
1259

    
1260
  for dev in instance.disks:
1261
    cfgw.SetDiskID(dev, node)
1262

    
1263
  retries = 0
1264
  while True:
1265
    max_time = 0
1266
    done = True
1267
    cumul_degraded = False
1268
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1269
    if not rstats:
1270
      proc.LogWarning("Can't get any data from node %s" % node)
1271
      retries += 1
1272
      if retries >= 10:
1273
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1274
                                 " aborting." % node)
1275
      time.sleep(6)
1276
      continue
1277
    retries = 0
1278
    for i in range(len(rstats)):
1279
      mstat = rstats[i]
1280
      if mstat is None:
1281
        proc.LogWarning("Can't compute data for node %s/%s" %
1282
                        (node, instance.disks[i].iv_name))
1283
        continue
1284
      # we ignore the ldisk parameter
1285
      perc_done, est_time, is_degraded, _ = mstat
1286
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1287
      if perc_done is not None:
1288
        done = False
1289
        if est_time is not None:
1290
          rem_time = "%d estimated seconds remaining" % est_time
1291
          max_time = est_time
1292
        else:
1293
          rem_time = "no time estimate"
1294
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1295
                     (instance.disks[i].iv_name, perc_done, rem_time))
1296
    if done or oneshot:
1297
      break
1298

    
1299
    if unlock:
1300
      #utils.Unlock('cmd')
1301
      pass
1302
    try:
1303
      time.sleep(min(60, max_time))
1304
    finally:
1305
      if unlock:
1306
        #utils.Lock('cmd')
1307
        pass
1308

    
1309
  if done:
1310
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1311
  return not cumul_degraded
1312

    
1313

    
1314
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1315
  """Check that mirrors are not degraded.
1316

1317
  The ldisk parameter, if True, will change the test from the
1318
  is_degraded attribute (which represents overall non-ok status for
1319
  the device(s)) to the ldisk (representing the local storage status).
1320

1321
  """
1322
  cfgw.SetDiskID(dev, node)
1323
  if ldisk:
1324
    idx = 6
1325
  else:
1326
    idx = 5
1327

    
1328
  result = True
1329
  if on_primary or dev.AssembleOnSecondary():
1330
    rstats = rpc.call_blockdev_find(node, dev)
1331
    if not rstats:
1332
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1333
      result = False
1334
    else:
1335
      result = result and (not rstats[idx])
1336
  if dev.children:
1337
    for child in dev.children:
1338
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1339

    
1340
  return result
1341

    
1342

    
1343
class LUDiagnoseOS(NoHooksLU):
1344
  """Logical unit for OS diagnose/query.
1345

1346
  """
1347
  _OP_REQP = ["output_fields", "names"]
1348

    
1349
  def CheckPrereq(self):
1350
    """Check prerequisites.
1351

1352
    This always succeeds, since this is a pure query LU.
1353

1354
    """
1355
    if self.op.names:
1356
      raise errors.OpPrereqError("Selective OS query not supported")
1357

    
1358
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1359
    _CheckOutputFields(static=[],
1360
                       dynamic=self.dynamic_fields,
1361
                       selected=self.op.output_fields)
1362

    
1363
  @staticmethod
1364
  def _DiagnoseByOS(node_list, rlist):
1365
    """Remaps a per-node return list into an a per-os per-node dictionary
1366

1367
      Args:
1368
        node_list: a list with the names of all nodes
1369
        rlist: a map with node names as keys and OS objects as values
1370

1371
      Returns:
1372
        map: a map with osnames as keys and as value another map, with
1373
             nodes as
1374
             keys and list of OS objects as values
1375
             e.g. {"debian-etch": {"node1": [<object>,...],
1376
                                   "node2": [<object>,]}
1377
                  }
1378

1379
    """
1380
    all_os = {}
1381
    for node_name, nr in rlist.iteritems():
1382
      if not nr:
1383
        continue
1384
      for os_obj in nr:
1385
        if os_obj.name not in all_os:
1386
          # build a list of nodes for this os containing empty lists
1387
          # for each node in node_list
1388
          all_os[os_obj.name] = {}
1389
          for nname in node_list:
1390
            all_os[os_obj.name][nname] = []
1391
        all_os[os_obj.name][node_name].append(os_obj)
1392
    return all_os
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Compute the list of OSes.
1396

1397
    """
1398
    node_list = self.cfg.GetNodeList()
1399
    node_data = rpc.call_os_diagnose(node_list)
1400
    if node_data == False:
1401
      raise errors.OpExecError("Can't gather the list of OSes")
1402
    pol = self._DiagnoseByOS(node_list, node_data)
1403
    output = []
1404
    for os_name, os_data in pol.iteritems():
1405
      row = []
1406
      for field in self.op.output_fields:
1407
        if field == "name":
1408
          val = os_name
1409
        elif field == "valid":
1410
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1411
        elif field == "node_status":
1412
          val = {}
1413
          for node_name, nos_list in os_data.iteritems():
1414
            val[node_name] = [(v.status, v.path) for v in nos_list]
1415
        else:
1416
          raise errors.ParameterError(field)
1417
        row.append(val)
1418
      output.append(row)
1419

    
1420
    return output
1421

    
1422

    
1423
class LURemoveNode(LogicalUnit):
1424
  """Logical unit for removing a node.
1425

1426
  """
1427
  HPATH = "node-remove"
1428
  HTYPE = constants.HTYPE_NODE
1429
  _OP_REQP = ["node_name"]
1430

    
1431
  def BuildHooksEnv(self):
1432
    """Build hooks env.
1433

1434
    This doesn't run on the target node in the pre phase as a failed
1435
    node would not allows itself to run.
1436

1437
    """
1438
    env = {
1439
      "OP_TARGET": self.op.node_name,
1440
      "NODE_NAME": self.op.node_name,
1441
      }
1442
    all_nodes = self.cfg.GetNodeList()
1443
    all_nodes.remove(self.op.node_name)
1444
    return env, all_nodes, all_nodes
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    This checks:
1450
     - the node exists in the configuration
1451
     - it does not have primary or secondary instances
1452
     - it's not the master
1453

1454
    Any errors are signalled by raising errors.OpPrereqError.
1455

1456
    """
1457
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1458
    if node is None:
1459
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1460

    
1461
    instance_list = self.cfg.GetInstanceList()
1462

    
1463
    masternode = self.sstore.GetMasterNode()
1464
    if node.name == masternode:
1465
      raise errors.OpPrereqError("Node is the master node,"
1466
                                 " you need to failover first.")
1467

    
1468
    for instance_name in instance_list:
1469
      instance = self.cfg.GetInstanceInfo(instance_name)
1470
      if node.name == instance.primary_node:
1471
        raise errors.OpPrereqError("Instance %s still running on the node,"
1472
                                   " please remove first." % instance_name)
1473
      if node.name in instance.secondary_nodes:
1474
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1475
                                   " please remove first." % instance_name)
1476
    self.op.node_name = node.name
1477
    self.node = node
1478

    
1479
  def Exec(self, feedback_fn):
1480
    """Removes the node from the cluster.
1481

1482
    """
1483
    node = self.node
1484
    logger.Info("stopping the node daemon and removing configs from node %s" %
1485
                node.name)
1486

    
1487
    rpc.call_node_leave_cluster(node.name)
1488

    
1489
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1490

    
1491
    logger.Info("Removing node %s from config" % node.name)
1492

    
1493
    self.cfg.RemoveNode(node.name)
1494

    
1495
    _RemoveHostFromEtcHosts(node.name)
1496

    
1497

    
1498
class LUQueryNodes(NoHooksLU):
1499
  """Logical unit for querying nodes.
1500

1501
  """
1502
  _OP_REQP = ["output_fields", "names"]
1503

    
1504
  def CheckPrereq(self):
1505
    """Check prerequisites.
1506

1507
    This checks that the fields required are valid output fields.
1508

1509
    """
1510
    self.dynamic_fields = frozenset([
1511
      "dtotal", "dfree",
1512
      "mtotal", "mnode", "mfree",
1513
      "bootid",
1514
      "ctotal",
1515
      ])
1516

    
1517
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1518
                               "pinst_list", "sinst_list",
1519
                               "pip", "sip"],
1520
                       dynamic=self.dynamic_fields,
1521
                       selected=self.op.output_fields)
1522

    
1523
    self.wanted = _GetWantedNodes(self, self.op.names)
1524

    
1525
  def Exec(self, feedback_fn):
1526
    """Computes the list of nodes and their attributes.
1527

1528
    """
1529
    nodenames = self.wanted
1530
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1531

    
1532
    # begin data gathering
1533

    
1534
    if self.dynamic_fields.intersection(self.op.output_fields):
1535
      live_data = {}
1536
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1537
      for name in nodenames:
1538
        nodeinfo = node_data.get(name, None)
1539
        if nodeinfo:
1540
          live_data[name] = {
1541
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1542
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1543
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1544
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1545
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1546
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1547
            "bootid": nodeinfo['bootid'],
1548
            }
1549
        else:
1550
          live_data[name] = {}
1551
    else:
1552
      live_data = dict.fromkeys(nodenames, {})
1553

    
1554
    node_to_primary = dict([(name, set()) for name in nodenames])
1555
    node_to_secondary = dict([(name, set()) for name in nodenames])
1556

    
1557
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1558
                             "sinst_cnt", "sinst_list"))
1559
    if inst_fields & frozenset(self.op.output_fields):
1560
      instancelist = self.cfg.GetInstanceList()
1561

    
1562
      for instance_name in instancelist:
1563
        inst = self.cfg.GetInstanceInfo(instance_name)
1564
        if inst.primary_node in node_to_primary:
1565
          node_to_primary[inst.primary_node].add(inst.name)
1566
        for secnode in inst.secondary_nodes:
1567
          if secnode in node_to_secondary:
1568
            node_to_secondary[secnode].add(inst.name)
1569

    
1570
    # end data gathering
1571

    
1572
    output = []
1573
    for node in nodelist:
1574
      node_output = []
1575
      for field in self.op.output_fields:
1576
        if field == "name":
1577
          val = node.name
1578
        elif field == "pinst_list":
1579
          val = list(node_to_primary[node.name])
1580
        elif field == "sinst_list":
1581
          val = list(node_to_secondary[node.name])
1582
        elif field == "pinst_cnt":
1583
          val = len(node_to_primary[node.name])
1584
        elif field == "sinst_cnt":
1585
          val = len(node_to_secondary[node.name])
1586
        elif field == "pip":
1587
          val = node.primary_ip
1588
        elif field == "sip":
1589
          val = node.secondary_ip
1590
        elif field in self.dynamic_fields:
1591
          val = live_data[node.name].get(field, None)
1592
        else:
1593
          raise errors.ParameterError(field)
1594
        node_output.append(val)
1595
      output.append(node_output)
1596

    
1597
    return output
1598

    
1599

    
1600
class LUQueryNodeVolumes(NoHooksLU):
1601
  """Logical unit for getting volumes on node(s).
1602

1603
  """
1604
  _OP_REQP = ["nodes", "output_fields"]
1605

    
1606
  def CheckPrereq(self):
1607
    """Check prerequisites.
1608

1609
    This checks that the fields required are valid output fields.
1610

1611
    """
1612
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1613

    
1614
    _CheckOutputFields(static=["node"],
1615
                       dynamic=["phys", "vg", "name", "size", "instance"],
1616
                       selected=self.op.output_fields)
1617

    
1618

    
1619
  def Exec(self, feedback_fn):
1620
    """Computes the list of nodes and their attributes.
1621

1622
    """
1623
    nodenames = self.nodes
1624
    volumes = rpc.call_node_volumes(nodenames)
1625

    
1626
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1627
             in self.cfg.GetInstanceList()]
1628

    
1629
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1630

    
1631
    output = []
1632
    for node in nodenames:
1633
      if node not in volumes or not volumes[node]:
1634
        continue
1635

    
1636
      node_vols = volumes[node][:]
1637
      node_vols.sort(key=lambda vol: vol['dev'])
1638

    
1639
      for vol in node_vols:
1640
        node_output = []
1641
        for field in self.op.output_fields:
1642
          if field == "node":
1643
            val = node
1644
          elif field == "phys":
1645
            val = vol['dev']
1646
          elif field == "vg":
1647
            val = vol['vg']
1648
          elif field == "name":
1649
            val = vol['name']
1650
          elif field == "size":
1651
            val = int(float(vol['size']))
1652
          elif field == "instance":
1653
            for inst in ilist:
1654
              if node not in lv_by_node[inst]:
1655
                continue
1656
              if vol['name'] in lv_by_node[inst][node]:
1657
                val = inst.name
1658
                break
1659
            else:
1660
              val = '-'
1661
          else:
1662
            raise errors.ParameterError(field)
1663
          node_output.append(str(val))
1664

    
1665
        output.append(node_output)
1666

    
1667
    return output
1668

    
1669

    
1670
class LUAddNode(LogicalUnit):
1671
  """Logical unit for adding node to the cluster.
1672

1673
  """
1674
  HPATH = "node-add"
1675
  HTYPE = constants.HTYPE_NODE
1676
  _OP_REQP = ["node_name"]
1677

    
1678
  def BuildHooksEnv(self):
1679
    """Build hooks env.
1680

1681
    This will run on all nodes before, and on all nodes + the new node after.
1682

1683
    """
1684
    env = {
1685
      "OP_TARGET": self.op.node_name,
1686
      "NODE_NAME": self.op.node_name,
1687
      "NODE_PIP": self.op.primary_ip,
1688
      "NODE_SIP": self.op.secondary_ip,
1689
      }
1690
    nodes_0 = self.cfg.GetNodeList()
1691
    nodes_1 = nodes_0 + [self.op.node_name, ]
1692
    return env, nodes_0, nodes_1
1693

    
1694
  def CheckPrereq(self):
1695
    """Check prerequisites.
1696

1697
    This checks:
1698
     - the new node is not already in the config
1699
     - it is resolvable
1700
     - its parameters (single/dual homed) matches the cluster
1701

1702
    Any errors are signalled by raising errors.OpPrereqError.
1703

1704
    """
1705
    node_name = self.op.node_name
1706
    cfg = self.cfg
1707

    
1708
    dns_data = utils.HostInfo(node_name)
1709

    
1710
    node = dns_data.name
1711
    primary_ip = self.op.primary_ip = dns_data.ip
1712
    secondary_ip = getattr(self.op, "secondary_ip", None)
1713
    if secondary_ip is None:
1714
      secondary_ip = primary_ip
1715
    if not utils.IsValidIP(secondary_ip):
1716
      raise errors.OpPrereqError("Invalid secondary IP given")
1717
    self.op.secondary_ip = secondary_ip
1718

    
1719
    node_list = cfg.GetNodeList()
1720
    if not self.op.readd and node in node_list:
1721
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1722
                                 node)
1723
    elif self.op.readd and node not in node_list:
1724
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1725

    
1726
    for existing_node_name in node_list:
1727
      existing_node = cfg.GetNodeInfo(existing_node_name)
1728

    
1729
      if self.op.readd and node == existing_node_name:
1730
        if (existing_node.primary_ip != primary_ip or
1731
            existing_node.secondary_ip != secondary_ip):
1732
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1733
                                     " address configuration as before")
1734
        continue
1735

    
1736
      if (existing_node.primary_ip == primary_ip or
1737
          existing_node.secondary_ip == primary_ip or
1738
          existing_node.primary_ip == secondary_ip or
1739
          existing_node.secondary_ip == secondary_ip):
1740
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1741
                                   " existing node %s" % existing_node.name)
1742

    
1743
    # check that the type of the node (single versus dual homed) is the
1744
    # same as for the master
1745
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1746
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1747
    newbie_singlehomed = secondary_ip == primary_ip
1748
    if master_singlehomed != newbie_singlehomed:
1749
      if master_singlehomed:
1750
        raise errors.OpPrereqError("The master has no private ip but the"
1751
                                   " new node has one")
1752
      else:
1753
        raise errors.OpPrereqError("The master has a private ip but the"
1754
                                   " new node doesn't have one")
1755

    
1756
    # checks reachablity
1757
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1758
      raise errors.OpPrereqError("Node not reachable by ping")
1759

    
1760
    if not newbie_singlehomed:
1761
      # check reachability from my secondary ip to newbie's secondary ip
1762
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1763
                           source=myself.secondary_ip):
1764
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1765
                                   " based ping to noded port")
1766

    
1767
    self.new_node = objects.Node(name=node,
1768
                                 primary_ip=primary_ip,
1769
                                 secondary_ip=secondary_ip)
1770

    
1771
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1772
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1773
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1774
                                   constants.VNC_PASSWORD_FILE)
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Adds the new node to the cluster.
1778

1779
    """
1780
    new_node = self.new_node
1781
    node = new_node.name
1782

    
1783
    # set up inter-node password and certificate and restarts the node daemon
1784
    gntpass = self.sstore.GetNodeDaemonPassword()
1785
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1786
      raise errors.OpExecError("ganeti password corruption detected")
1787
    f = open(constants.SSL_CERT_FILE)
1788
    try:
1789
      gntpem = f.read(8192)
1790
    finally:
1791
      f.close()
1792
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1793
    # so we use this to detect an invalid certificate; as long as the
1794
    # cert doesn't contain this, the here-document will be correctly
1795
    # parsed by the shell sequence below
1796
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1797
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1798
    if not gntpem.endswith("\n"):
1799
      raise errors.OpExecError("PEM must end with newline")
1800
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1801

    
1802
    # and then connect with ssh to set password and start ganeti-noded
1803
    # note that all the below variables are sanitized at this point,
1804
    # either by being constants or by the checks above
1805
    ss = self.sstore
1806
    mycommand = ("umask 077 && "
1807
                 "echo '%s' > '%s' && "
1808
                 "cat > '%s' << '!EOF.' && \n"
1809
                 "%s!EOF.\n%s restart" %
1810
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1811
                  constants.SSL_CERT_FILE, gntpem,
1812
                  constants.NODE_INITD_SCRIPT))
1813

    
1814
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1815
    if result.failed:
1816
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1817
                               " output: %s" %
1818
                               (node, result.fail_reason, result.output))
1819

    
1820
    # check connectivity
1821
    time.sleep(4)
1822

    
1823
    result = rpc.call_version([node])[node]
1824
    if result:
1825
      if constants.PROTOCOL_VERSION == result:
1826
        logger.Info("communication to node %s fine, sw version %s match" %
1827
                    (node, result))
1828
      else:
1829
        raise errors.OpExecError("Version mismatch master version %s,"
1830
                                 " node version %s" %
1831
                                 (constants.PROTOCOL_VERSION, result))
1832
    else:
1833
      raise errors.OpExecError("Cannot get version from the new node")
1834

    
1835
    # setup ssh on node
1836
    logger.Info("copy ssh key to node %s" % node)
1837
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1838
    keyarray = []
1839
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1840
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1841
                priv_key, pub_key]
1842

    
1843
    for i in keyfiles:
1844
      f = open(i, 'r')
1845
      try:
1846
        keyarray.append(f.read())
1847
      finally:
1848
        f.close()
1849

    
1850
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1851
                               keyarray[3], keyarray[4], keyarray[5])
1852

    
1853
    if not result:
1854
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1855

    
1856
    # Add node to our /etc/hosts, and add key to known_hosts
1857
    _AddHostToEtcHosts(new_node.name)
1858

    
1859
    if new_node.secondary_ip != new_node.primary_ip:
1860
      if not rpc.call_node_tcp_ping(new_node.name,
1861
                                    constants.LOCALHOST_IP_ADDRESS,
1862
                                    new_node.secondary_ip,
1863
                                    constants.DEFAULT_NODED_PORT,
1864
                                    10, False):
1865
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1866
                                 " you gave (%s). Please fix and re-run this"
1867
                                 " command." % new_node.secondary_ip)
1868

    
1869
    success, msg = self.ssh.VerifyNodeHostname(node)
1870
    if not success:
1871
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1872
                               " than the one the resolver gives: %s."
1873
                               " Please fix and re-run this command." %
1874
                               (node, msg))
1875

    
1876
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1877
    # including the node just added
1878
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1879
    dist_nodes = self.cfg.GetNodeList()
1880
    if not self.op.readd:
1881
      dist_nodes.append(node)
1882
    if myself.name in dist_nodes:
1883
      dist_nodes.remove(myself.name)
1884

    
1885
    logger.Debug("Copying hosts and known_hosts to all nodes")
1886
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1887
      result = rpc.call_upload_file(dist_nodes, fname)
1888
      for to_node in dist_nodes:
1889
        if not result[to_node]:
1890
          logger.Error("copy of file %s to node %s failed" %
1891
                       (fname, to_node))
1892

    
1893
    to_copy = ss.GetFileList()
1894
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1895
      to_copy.append(constants.VNC_PASSWORD_FILE)
1896
    for fname in to_copy:
1897
      if not self.ssh.CopyFileToNode(node, fname):
1898
        logger.Error("could not copy file %s to node %s" % (fname, node))
1899

    
1900
    if not self.op.readd:
1901
      logger.Info("adding node %s to cluster.conf" % node)
1902
      self.cfg.AddNode(new_node)
1903

    
1904

    
1905
class LUMasterFailover(LogicalUnit):
1906
  """Failover the master node to the current node.
1907

1908
  This is a special LU in that it must run on a non-master node.
1909

1910
  """
1911
  HPATH = "master-failover"
1912
  HTYPE = constants.HTYPE_CLUSTER
1913
  REQ_MASTER = False
1914
  _OP_REQP = []
1915

    
1916
  def BuildHooksEnv(self):
1917
    """Build hooks env.
1918

1919
    This will run on the new master only in the pre phase, and on all
1920
    the nodes in the post phase.
1921

1922
    """
1923
    env = {
1924
      "OP_TARGET": self.new_master,
1925
      "NEW_MASTER": self.new_master,
1926
      "OLD_MASTER": self.old_master,
1927
      }
1928
    return env, [self.new_master], self.cfg.GetNodeList()
1929

    
1930
  def CheckPrereq(self):
1931
    """Check prerequisites.
1932

1933
    This checks that we are not already the master.
1934

1935
    """
1936
    self.new_master = utils.HostInfo().name
1937
    self.old_master = self.sstore.GetMasterNode()
1938

    
1939
    if self.old_master == self.new_master:
1940
      raise errors.OpPrereqError("This commands must be run on the node"
1941
                                 " where you want the new master to be."
1942
                                 " %s is already the master" %
1943
                                 self.old_master)
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Failover the master node.
1947

1948
    This command, when run on a non-master node, will cause the current
1949
    master to cease being master, and the non-master to become new
1950
    master.
1951

1952
    """
1953
    #TODO: do not rely on gethostname returning the FQDN
1954
    logger.Info("setting master to %s, old master: %s" %
1955
                (self.new_master, self.old_master))
1956

    
1957
    if not rpc.call_node_stop_master(self.old_master):
1958
      logger.Error("could disable the master role on the old master"
1959
                   " %s, please disable manually" % self.old_master)
1960

    
1961
    ss = self.sstore
1962
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1963
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1964
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1965
      logger.Error("could not distribute the new simple store master file"
1966
                   " to the other nodes, please check.")
1967

    
1968
    if not rpc.call_node_start_master(self.new_master):
1969
      logger.Error("could not start the master role on the new master"
1970
                   " %s, please check" % self.new_master)
1971
      feedback_fn("Error in activating the master IP on the new master,"
1972
                  " please fix manually.")
1973

    
1974

    
1975

    
1976
class LUQueryClusterInfo(NoHooksLU):
1977
  """Query cluster configuration.
1978

1979
  """
1980
  _OP_REQP = []
1981
  REQ_MASTER = False
1982

    
1983
  def CheckPrereq(self):
1984
    """No prerequsites needed for this LU.
1985

1986
    """
1987
    pass
1988

    
1989
  def Exec(self, feedback_fn):
1990
    """Return cluster config.
1991

1992
    """
1993
    result = {
1994
      "name": self.sstore.GetClusterName(),
1995
      "software_version": constants.RELEASE_VERSION,
1996
      "protocol_version": constants.PROTOCOL_VERSION,
1997
      "config_version": constants.CONFIG_VERSION,
1998
      "os_api_version": constants.OS_API_VERSION,
1999
      "export_version": constants.EXPORT_VERSION,
2000
      "master": self.sstore.GetMasterNode(),
2001
      "architecture": (platform.architecture()[0], platform.machine()),
2002
      "hypervisor_type": self.sstore.GetHypervisorType(),
2003
      }
2004

    
2005
    return result
2006

    
2007

    
2008
class LUClusterCopyFile(NoHooksLU):
2009
  """Copy file to cluster.
2010

2011
  """
2012
  _OP_REQP = ["nodes", "filename"]
2013

    
2014
  def CheckPrereq(self):
2015
    """Check prerequisites.
2016

2017
    It should check that the named file exists and that the given list
2018
    of nodes is valid.
2019

2020
    """
2021
    if not os.path.exists(self.op.filename):
2022
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2023

    
2024
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2025

    
2026
  def Exec(self, feedback_fn):
2027
    """Copy a file from master to some nodes.
2028

2029
    Args:
2030
      opts - class with options as members
2031
      args - list containing a single element, the file name
2032
    Opts used:
2033
      nodes - list containing the name of target nodes; if empty, all nodes
2034

2035
    """
2036
    filename = self.op.filename
2037

    
2038
    myname = utils.HostInfo().name
2039

    
2040
    for node in self.nodes:
2041
      if node == myname:
2042
        continue
2043
      if not self.ssh.CopyFileToNode(node, filename):
2044
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2045

    
2046

    
2047
class LUDumpClusterConfig(NoHooksLU):
2048
  """Return a text-representation of the cluster-config.
2049

2050
  """
2051
  _OP_REQP = []
2052

    
2053
  def CheckPrereq(self):
2054
    """No prerequisites.
2055

2056
    """
2057
    pass
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Dump a representation of the cluster config to the standard output.
2061

2062
    """
2063
    return self.cfg.DumpConfig()
2064

    
2065

    
2066
class LURunClusterCommand(NoHooksLU):
2067
  """Run a command on some nodes.
2068

2069
  """
2070
  _OP_REQP = ["command", "nodes"]
2071

    
2072
  def CheckPrereq(self):
2073
    """Check prerequisites.
2074

2075
    It checks that the given list of nodes is valid.
2076

2077
    """
2078
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2079

    
2080
  def Exec(self, feedback_fn):
2081
    """Run a command on some nodes.
2082

2083
    """
2084
    # put the master at the end of the nodes list
2085
    master_node = self.sstore.GetMasterNode()
2086
    if master_node in self.nodes:
2087
      self.nodes.remove(master_node)
2088
      self.nodes.append(master_node)
2089

    
2090
    data = []
2091
    for node in self.nodes:
2092
      result = self.ssh.Run(node, "root", self.op.command)
2093
      data.append((node, result.output, result.exit_code))
2094

    
2095
    return data
2096

    
2097

    
2098
class LUActivateInstanceDisks(NoHooksLU):
2099
  """Bring up an instance's disks.
2100

2101
  """
2102
  _OP_REQP = ["instance_name"]
2103

    
2104
  def CheckPrereq(self):
2105
    """Check prerequisites.
2106

2107
    This checks that the instance is in the cluster.
2108

2109
    """
2110
    instance = self.cfg.GetInstanceInfo(
2111
      self.cfg.ExpandInstanceName(self.op.instance_name))
2112
    if instance is None:
2113
      raise errors.OpPrereqError("Instance '%s' not known" %
2114
                                 self.op.instance_name)
2115
    self.instance = instance
2116

    
2117

    
2118
  def Exec(self, feedback_fn):
2119
    """Activate the disks.
2120

2121
    """
2122
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2123
    if not disks_ok:
2124
      raise errors.OpExecError("Cannot activate block devices")
2125

    
2126
    return disks_info
2127

    
2128

    
2129
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2130
  """Prepare the block devices for an instance.
2131

2132
  This sets up the block devices on all nodes.
2133

2134
  Args:
2135
    instance: a ganeti.objects.Instance object
2136
    ignore_secondaries: if true, errors on secondary nodes won't result
2137
                        in an error return from the function
2138

2139
  Returns:
2140
    false if the operation failed
2141
    list of (host, instance_visible_name, node_visible_name) if the operation
2142
         suceeded with the mapping from node devices to instance devices
2143
  """
2144
  device_info = []
2145
  disks_ok = True
2146
  iname = instance.name
2147
  # With the two passes mechanism we try to reduce the window of
2148
  # opportunity for the race condition of switching DRBD to primary
2149
  # before handshaking occured, but we do not eliminate it
2150

    
2151
  # The proper fix would be to wait (with some limits) until the
2152
  # connection has been made and drbd transitions from WFConnection
2153
  # into any other network-connected state (Connected, SyncTarget,
2154
  # SyncSource, etc.)
2155

    
2156
  # 1st pass, assemble on all nodes in secondary mode
2157
  for inst_disk in instance.disks:
2158
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2159
      cfg.SetDiskID(node_disk, node)
2160
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2161
      if not result:
2162
        logger.Error("could not prepare block device %s on node %s"
2163
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2164
        if not ignore_secondaries:
2165
          disks_ok = False
2166

    
2167
  # FIXME: race condition on drbd migration to primary
2168

    
2169
  # 2nd pass, do only the primary node
2170
  for inst_disk in instance.disks:
2171
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2172
      if node != instance.primary_node:
2173
        continue
2174
      cfg.SetDiskID(node_disk, node)
2175
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2176
      if not result:
2177
        logger.Error("could not prepare block device %s on node %s"
2178
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2179
        disks_ok = False
2180
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2181

    
2182
  # leave the disks configured for the primary node
2183
  # this is a workaround that would be fixed better by
2184
  # improving the logical/physical id handling
2185
  for disk in instance.disks:
2186
    cfg.SetDiskID(disk, instance.primary_node)
2187

    
2188
  return disks_ok, device_info
2189

    
2190

    
2191
def _StartInstanceDisks(cfg, instance, force):
2192
  """Start the disks of an instance.
2193

2194
  """
2195
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2196
                                           ignore_secondaries=force)
2197
  if not disks_ok:
2198
    _ShutdownInstanceDisks(instance, cfg)
2199
    if force is not None and not force:
2200
      logger.Error("If the message above refers to a secondary node,"
2201
                   " you can retry the operation using '--force'.")
2202
    raise errors.OpExecError("Disk consistency error")
2203

    
2204

    
2205
class LUDeactivateInstanceDisks(NoHooksLU):
2206
  """Shutdown an instance's disks.
2207

2208
  """
2209
  _OP_REQP = ["instance_name"]
2210

    
2211
  def CheckPrereq(self):
2212
    """Check prerequisites.
2213

2214
    This checks that the instance is in the cluster.
2215

2216
    """
2217
    instance = self.cfg.GetInstanceInfo(
2218
      self.cfg.ExpandInstanceName(self.op.instance_name))
2219
    if instance is None:
2220
      raise errors.OpPrereqError("Instance '%s' not known" %
2221
                                 self.op.instance_name)
2222
    self.instance = instance
2223

    
2224
  def Exec(self, feedback_fn):
2225
    """Deactivate the disks
2226

2227
    """
2228
    instance = self.instance
2229
    ins_l = rpc.call_instance_list([instance.primary_node])
2230
    ins_l = ins_l[instance.primary_node]
2231
    if not type(ins_l) is list:
2232
      raise errors.OpExecError("Can't contact node '%s'" %
2233
                               instance.primary_node)
2234

    
2235
    if self.instance.name in ins_l:
2236
      raise errors.OpExecError("Instance is running, can't shutdown"
2237
                               " block devices.")
2238

    
2239
    _ShutdownInstanceDisks(instance, self.cfg)
2240

    
2241

    
2242
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2243
  """Shutdown block devices of an instance.
2244

2245
  This does the shutdown on all nodes of the instance.
2246

2247
  If the ignore_primary is false, errors on the primary node are
2248
  ignored.
2249

2250
  """
2251
  result = True
2252
  for disk in instance.disks:
2253
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2254
      cfg.SetDiskID(top_disk, node)
2255
      if not rpc.call_blockdev_shutdown(node, top_disk):
2256
        logger.Error("could not shutdown block device %s on node %s" %
2257
                     (disk.iv_name, node))
2258
        if not ignore_primary or node != instance.primary_node:
2259
          result = False
2260
  return result
2261

    
2262

    
2263
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2264
  """Checks if a node has enough free memory.
2265

2266
  This function check if a given node has the needed amount of free
2267
  memory. In case the node has less memory or we cannot get the
2268
  information from the node, this function raise an OpPrereqError
2269
  exception.
2270

2271
  Args:
2272
    - cfg: a ConfigWriter instance
2273
    - node: the node name
2274
    - reason: string to use in the error message
2275
    - requested: the amount of memory in MiB
2276

2277
  """
2278
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2279
  if not nodeinfo or not isinstance(nodeinfo, dict):
2280
    raise errors.OpPrereqError("Could not contact node %s for resource"
2281
                             " information" % (node,))
2282

    
2283
  free_mem = nodeinfo[node].get('memory_free')
2284
  if not isinstance(free_mem, int):
2285
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2286
                             " was '%s'" % (node, free_mem))
2287
  if requested > free_mem:
2288
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2289
                             " needed %s MiB, available %s MiB" %
2290
                             (node, reason, requested, free_mem))
2291

    
2292

    
2293
class LUStartupInstance(LogicalUnit):
2294
  """Starts an instance.
2295

2296
  """
2297
  HPATH = "instance-start"
2298
  HTYPE = constants.HTYPE_INSTANCE
2299
  _OP_REQP = ["instance_name", "force"]
2300

    
2301
  def BuildHooksEnv(self):
2302
    """Build hooks env.
2303

2304
    This runs on master, primary and secondary nodes of the instance.
2305

2306
    """
2307
    env = {
2308
      "FORCE": self.op.force,
2309
      }
2310
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2311
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2312
          list(self.instance.secondary_nodes))
2313
    return env, nl, nl
2314

    
2315
  def CheckPrereq(self):
2316
    """Check prerequisites.
2317

2318
    This checks that the instance is in the cluster.
2319

2320
    """
2321
    instance = self.cfg.GetInstanceInfo(
2322
      self.cfg.ExpandInstanceName(self.op.instance_name))
2323
    if instance is None:
2324
      raise errors.OpPrereqError("Instance '%s' not known" %
2325
                                 self.op.instance_name)
2326

    
2327
    # check bridges existance
2328
    _CheckInstanceBridgesExist(instance)
2329

    
2330
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2331
                         "starting instance %s" % instance.name,
2332
                         instance.memory)
2333

    
2334
    self.instance = instance
2335
    self.op.instance_name = instance.name
2336

    
2337
  def Exec(self, feedback_fn):
2338
    """Start the instance.
2339

2340
    """
2341
    instance = self.instance
2342
    force = self.op.force
2343
    extra_args = getattr(self.op, "extra_args", "")
2344

    
2345
    self.cfg.MarkInstanceUp(instance.name)
2346

    
2347
    node_current = instance.primary_node
2348

    
2349
    _StartInstanceDisks(self.cfg, instance, force)
2350

    
2351
    if not rpc.call_instance_start(node_current, instance, extra_args):
2352
      _ShutdownInstanceDisks(instance, self.cfg)
2353
      raise errors.OpExecError("Could not start instance")
2354

    
2355

    
2356
class LURebootInstance(LogicalUnit):
2357
  """Reboot an instance.
2358

2359
  """
2360
  HPATH = "instance-reboot"
2361
  HTYPE = constants.HTYPE_INSTANCE
2362
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2363

    
2364
  def BuildHooksEnv(self):
2365
    """Build hooks env.
2366

2367
    This runs on master, primary and secondary nodes of the instance.
2368

2369
    """
2370
    env = {
2371
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2372
      }
2373
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2374
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2375
          list(self.instance.secondary_nodes))
2376
    return env, nl, nl
2377

    
2378
  def CheckPrereq(self):
2379
    """Check prerequisites.
2380

2381
    This checks that the instance is in the cluster.
2382

2383
    """
2384
    instance = self.cfg.GetInstanceInfo(
2385
      self.cfg.ExpandInstanceName(self.op.instance_name))
2386
    if instance is None:
2387
      raise errors.OpPrereqError("Instance '%s' not known" %
2388
                                 self.op.instance_name)
2389

    
2390
    # check bridges existance
2391
    _CheckInstanceBridgesExist(instance)
2392

    
2393
    self.instance = instance
2394
    self.op.instance_name = instance.name
2395

    
2396
  def Exec(self, feedback_fn):
2397
    """Reboot the instance.
2398

2399
    """
2400
    instance = self.instance
2401
    ignore_secondaries = self.op.ignore_secondaries
2402
    reboot_type = self.op.reboot_type
2403
    extra_args = getattr(self.op, "extra_args", "")
2404

    
2405
    node_current = instance.primary_node
2406

    
2407
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2408
                           constants.INSTANCE_REBOOT_HARD,
2409
                           constants.INSTANCE_REBOOT_FULL]:
2410
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2411
                                  (constants.INSTANCE_REBOOT_SOFT,
2412
                                   constants.INSTANCE_REBOOT_HARD,
2413
                                   constants.INSTANCE_REBOOT_FULL))
2414

    
2415
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2416
                       constants.INSTANCE_REBOOT_HARD]:
2417
      if not rpc.call_instance_reboot(node_current, instance,
2418
                                      reboot_type, extra_args):
2419
        raise errors.OpExecError("Could not reboot instance")
2420
    else:
2421
      if not rpc.call_instance_shutdown(node_current, instance):
2422
        raise errors.OpExecError("could not shutdown instance for full reboot")
2423
      _ShutdownInstanceDisks(instance, self.cfg)
2424
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2425
      if not rpc.call_instance_start(node_current, instance, extra_args):
2426
        _ShutdownInstanceDisks(instance, self.cfg)
2427
        raise errors.OpExecError("Could not start instance for full reboot")
2428

    
2429
    self.cfg.MarkInstanceUp(instance.name)
2430

    
2431

    
2432
class LUShutdownInstance(LogicalUnit):
2433
  """Shutdown an instance.
2434

2435
  """
2436
  HPATH = "instance-stop"
2437
  HTYPE = constants.HTYPE_INSTANCE
2438
  _OP_REQP = ["instance_name"]
2439

    
2440
  def BuildHooksEnv(self):
2441
    """Build hooks env.
2442

2443
    This runs on master, primary and secondary nodes of the instance.
2444

2445
    """
2446
    env = _BuildInstanceHookEnvByObject(self.instance)
2447
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2448
          list(self.instance.secondary_nodes))
2449
    return env, nl, nl
2450

    
2451
  def CheckPrereq(self):
2452
    """Check prerequisites.
2453

2454
    This checks that the instance is in the cluster.
2455

2456
    """
2457
    instance = self.cfg.GetInstanceInfo(
2458
      self.cfg.ExpandInstanceName(self.op.instance_name))
2459
    if instance is None:
2460
      raise errors.OpPrereqError("Instance '%s' not known" %
2461
                                 self.op.instance_name)
2462
    self.instance = instance
2463

    
2464
  def Exec(self, feedback_fn):
2465
    """Shutdown the instance.
2466

2467
    """
2468
    instance = self.instance
2469
    node_current = instance.primary_node
2470
    self.cfg.MarkInstanceDown(instance.name)
2471
    if not rpc.call_instance_shutdown(node_current, instance):
2472
      logger.Error("could not shutdown instance")
2473

    
2474
    _ShutdownInstanceDisks(instance, self.cfg)
2475

    
2476

    
2477
class LUReinstallInstance(LogicalUnit):
2478
  """Reinstall an instance.
2479

2480
  """
2481
  HPATH = "instance-reinstall"
2482
  HTYPE = constants.HTYPE_INSTANCE
2483
  _OP_REQP = ["instance_name"]
2484

    
2485
  def BuildHooksEnv(self):
2486
    """Build hooks env.
2487

2488
    This runs on master, primary and secondary nodes of the instance.
2489

2490
    """
2491
    env = _BuildInstanceHookEnvByObject(self.instance)
2492
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2493
          list(self.instance.secondary_nodes))
2494
    return env, nl, nl
2495

    
2496
  def CheckPrereq(self):
2497
    """Check prerequisites.
2498

2499
    This checks that the instance is in the cluster and is not running.
2500

2501
    """
2502
    instance = self.cfg.GetInstanceInfo(
2503
      self.cfg.ExpandInstanceName(self.op.instance_name))
2504
    if instance is None:
2505
      raise errors.OpPrereqError("Instance '%s' not known" %
2506
                                 self.op.instance_name)
2507
    if instance.disk_template == constants.DT_DISKLESS:
2508
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2509
                                 self.op.instance_name)
2510
    if instance.status != "down":
2511
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2512
                                 self.op.instance_name)
2513
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2514
    if remote_info:
2515
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2516
                                 (self.op.instance_name,
2517
                                  instance.primary_node))
2518

    
2519
    self.op.os_type = getattr(self.op, "os_type", None)
2520
    if self.op.os_type is not None:
2521
      # OS verification
2522
      pnode = self.cfg.GetNodeInfo(
2523
        self.cfg.ExpandNodeName(instance.primary_node))
2524
      if pnode is None:
2525
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2526
                                   self.op.pnode)
2527
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2528
      if not os_obj:
2529
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2530
                                   " primary node"  % self.op.os_type)
2531

    
2532
    self.instance = instance
2533

    
2534
  def Exec(self, feedback_fn):
2535
    """Reinstall the instance.
2536

2537
    """
2538
    inst = self.instance
2539

    
2540
    if self.op.os_type is not None:
2541
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2542
      inst.os = self.op.os_type
2543
      self.cfg.AddInstance(inst)
2544

    
2545
    _StartInstanceDisks(self.cfg, inst, None)
2546
    try:
2547
      feedback_fn("Running the instance OS create scripts...")
2548
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2549
        raise errors.OpExecError("Could not install OS for instance %s"
2550
                                 " on node %s" %
2551
                                 (inst.name, inst.primary_node))
2552
    finally:
2553
      _ShutdownInstanceDisks(inst, self.cfg)
2554

    
2555

    
2556
class LURenameInstance(LogicalUnit):
2557
  """Rename an instance.
2558

2559
  """
2560
  HPATH = "instance-rename"
2561
  HTYPE = constants.HTYPE_INSTANCE
2562
  _OP_REQP = ["instance_name", "new_name"]
2563

    
2564
  def BuildHooksEnv(self):
2565
    """Build hooks env.
2566

2567
    This runs on master, primary and secondary nodes of the instance.
2568

2569
    """
2570
    env = _BuildInstanceHookEnvByObject(self.instance)
2571
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2572
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2573
          list(self.instance.secondary_nodes))
2574
    return env, nl, nl
2575

    
2576
  def CheckPrereq(self):
2577
    """Check prerequisites.
2578

2579
    This checks that the instance is in the cluster and is not running.
2580

2581
    """
2582
    instance = self.cfg.GetInstanceInfo(
2583
      self.cfg.ExpandInstanceName(self.op.instance_name))
2584
    if instance is None:
2585
      raise errors.OpPrereqError("Instance '%s' not known" %
2586
                                 self.op.instance_name)
2587
    if instance.status != "down":
2588
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2589
                                 self.op.instance_name)
2590
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2591
    if remote_info:
2592
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2593
                                 (self.op.instance_name,
2594
                                  instance.primary_node))
2595
    self.instance = instance
2596

    
2597
    # new name verification
2598
    name_info = utils.HostInfo(self.op.new_name)
2599

    
2600
    self.op.new_name = new_name = name_info.name
2601
    instance_list = self.cfg.GetInstanceList()
2602
    if new_name in instance_list:
2603
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2604
                                 new_name)
2605

    
2606
    if not getattr(self.op, "ignore_ip", False):
2607
      command = ["fping", "-q", name_info.ip]
2608
      result = utils.RunCmd(command)
2609
      if not result.failed:
2610
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2611
                                   (name_info.ip, new_name))
2612

    
2613

    
2614
  def Exec(self, feedback_fn):
2615
    """Reinstall the instance.
2616

2617
    """
2618
    inst = self.instance
2619
    old_name = inst.name
2620

    
2621
    if inst.disk_template == constants.DT_FILE:
2622
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2623

    
2624
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2625

    
2626
    # re-read the instance from the configuration after rename
2627
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2628

    
2629
    if inst.disk_template == constants.DT_FILE:
2630
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2631
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2632
                                                old_file_storage_dir,
2633
                                                new_file_storage_dir)
2634

    
2635
      if not result:
2636
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2637
                                 " directory '%s' to '%s' (but the instance"
2638
                                 " has been renamed in Ganeti)" % (
2639
                                 inst.primary_node, old_file_storage_dir,
2640
                                 new_file_storage_dir))
2641

    
2642
      if not result[0]:
2643
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2644
                                 " (but the instance has been renamed in"
2645
                                 " Ganeti)" % (old_file_storage_dir,
2646
                                               new_file_storage_dir))
2647

    
2648
    _StartInstanceDisks(self.cfg, inst, None)
2649
    try:
2650
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2651
                                          "sda", "sdb"):
2652
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2653
               " instance has been renamed in Ganeti)" %
2654
               (inst.name, inst.primary_node))
2655
        logger.Error(msg)
2656
    finally:
2657
      _ShutdownInstanceDisks(inst, self.cfg)
2658

    
2659

    
2660
class LURemoveInstance(LogicalUnit):
2661
  """Remove an instance.
2662

2663
  """
2664
  HPATH = "instance-remove"
2665
  HTYPE = constants.HTYPE_INSTANCE
2666
  _OP_REQP = ["instance_name", "ignore_failures"]
2667

    
2668
  def BuildHooksEnv(self):
2669
    """Build hooks env.
2670

2671
    This runs on master, primary and secondary nodes of the instance.
2672

2673
    """
2674
    env = _BuildInstanceHookEnvByObject(self.instance)
2675
    nl = [self.sstore.GetMasterNode()]
2676
    return env, nl, nl
2677

    
2678
  def CheckPrereq(self):
2679
    """Check prerequisites.
2680

2681
    This checks that the instance is in the cluster.
2682

2683
    """
2684
    instance = self.cfg.GetInstanceInfo(
2685
      self.cfg.ExpandInstanceName(self.op.instance_name))
2686
    if instance is None:
2687
      raise errors.OpPrereqError("Instance '%s' not known" %
2688
                                 self.op.instance_name)
2689
    self.instance = instance
2690

    
2691
  def Exec(self, feedback_fn):
2692
    """Remove the instance.
2693

2694
    """
2695
    instance = self.instance
2696
    logger.Info("shutting down instance %s on node %s" %
2697
                (instance.name, instance.primary_node))
2698

    
2699
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2700
      if self.op.ignore_failures:
2701
        feedback_fn("Warning: can't shutdown instance")
2702
      else:
2703
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2704
                                 (instance.name, instance.primary_node))
2705

    
2706
    logger.Info("removing block devices for instance %s" % instance.name)
2707

    
2708
    if not _RemoveDisks(instance, self.cfg):
2709
      if self.op.ignore_failures:
2710
        feedback_fn("Warning: can't remove instance's disks")
2711
      else:
2712
        raise errors.OpExecError("Can't remove instance's disks")
2713

    
2714
    logger.Info("removing instance %s out of cluster config" % instance.name)
2715

    
2716
    self.cfg.RemoveInstance(instance.name)
2717

    
2718

    
2719
class LUQueryInstances(NoHooksLU):
2720
  """Logical unit for querying instances.
2721

2722
  """
2723
  _OP_REQP = ["output_fields", "names"]
2724

    
2725
  def CheckPrereq(self):
2726
    """Check prerequisites.
2727

2728
    This checks that the fields required are valid output fields.
2729

2730
    """
2731
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2732
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2733
                               "admin_state", "admin_ram",
2734
                               "disk_template", "ip", "mac", "bridge",
2735
                               "sda_size", "sdb_size", "vcpus"],
2736
                       dynamic=self.dynamic_fields,
2737
                       selected=self.op.output_fields)
2738

    
2739
    self.wanted = _GetWantedInstances(self, self.op.names)
2740

    
2741
  def Exec(self, feedback_fn):
2742
    """Computes the list of nodes and their attributes.
2743

2744
    """
2745
    instance_names = self.wanted
2746
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2747
                     in instance_names]
2748

    
2749
    # begin data gathering
2750

    
2751
    nodes = frozenset([inst.primary_node for inst in instance_list])
2752

    
2753
    bad_nodes = []
2754
    if self.dynamic_fields.intersection(self.op.output_fields):
2755
      live_data = {}
2756
      node_data = rpc.call_all_instances_info(nodes)
2757
      for name in nodes:
2758
        result = node_data[name]
2759
        if result:
2760
          live_data.update(result)
2761
        elif result == False:
2762
          bad_nodes.append(name)
2763
        # else no instance is alive
2764
    else:
2765
      live_data = dict([(name, {}) for name in instance_names])
2766

    
2767
    # end data gathering
2768

    
2769
    output = []
2770
    for instance in instance_list:
2771
      iout = []
2772
      for field in self.op.output_fields:
2773
        if field == "name":
2774
          val = instance.name
2775
        elif field == "os":
2776
          val = instance.os
2777
        elif field == "pnode":
2778
          val = instance.primary_node
2779
        elif field == "snodes":
2780
          val = list(instance.secondary_nodes)
2781
        elif field == "admin_state":
2782
          val = (instance.status != "down")
2783
        elif field == "oper_state":
2784
          if instance.primary_node in bad_nodes:
2785
            val = None
2786
          else:
2787
            val = bool(live_data.get(instance.name))
2788
        elif field == "status":
2789
          if instance.primary_node in bad_nodes:
2790
            val = "ERROR_nodedown"
2791
          else:
2792
            running = bool(live_data.get(instance.name))
2793
            if running:
2794
              if instance.status != "down":
2795
                val = "running"
2796
              else:
2797
                val = "ERROR_up"
2798
            else:
2799
              if instance.status != "down":
2800
                val = "ERROR_down"
2801
              else:
2802
                val = "ADMIN_down"
2803
        elif field == "admin_ram":
2804
          val = instance.memory
2805
        elif field == "oper_ram":
2806
          if instance.primary_node in bad_nodes:
2807
            val = None
2808
          elif instance.name in live_data:
2809
            val = live_data[instance.name].get("memory", "?")
2810
          else:
2811
            val = "-"
2812
        elif field == "disk_template":
2813
          val = instance.disk_template
2814
        elif field == "ip":
2815
          val = instance.nics[0].ip
2816
        elif field == "bridge":
2817
          val = instance.nics[0].bridge
2818
        elif field == "mac":
2819
          val = instance.nics[0].mac
2820
        elif field == "sda_size" or field == "sdb_size":
2821
          disk = instance.FindDisk(field[:3])
2822
          if disk is None:
2823
            val = None
2824
          else:
2825
            val = disk.size
2826
        elif field == "vcpus":
2827
          val = instance.vcpus
2828
        else:
2829
          raise errors.ParameterError(field)
2830
        iout.append(val)
2831
      output.append(iout)
2832

    
2833
    return output
2834

    
2835

    
2836
class LUFailoverInstance(LogicalUnit):
2837
  """Failover an instance.
2838

2839
  """
2840
  HPATH = "instance-failover"
2841
  HTYPE = constants.HTYPE_INSTANCE
2842
  _OP_REQP = ["instance_name", "ignore_consistency"]
2843

    
2844
  def BuildHooksEnv(self):
2845
    """Build hooks env.
2846

2847
    This runs on master, primary and secondary nodes of the instance.
2848

2849
    """
2850
    env = {
2851
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2852
      }
2853
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2854
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2855
    return env, nl, nl
2856

    
2857
  def CheckPrereq(self):
2858
    """Check prerequisites.
2859

2860
    This checks that the instance is in the cluster.
2861

2862
    """
2863
    instance = self.cfg.GetInstanceInfo(
2864
      self.cfg.ExpandInstanceName(self.op.instance_name))
2865
    if instance is None:
2866
      raise errors.OpPrereqError("Instance '%s' not known" %
2867
                                 self.op.instance_name)
2868

    
2869
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2870
      raise errors.OpPrereqError("Instance's disk layout is not"
2871
                                 " network mirrored, cannot failover.")
2872

    
2873
    secondary_nodes = instance.secondary_nodes
2874
    if not secondary_nodes:
2875
      raise errors.ProgrammerError("no secondary node but using "
2876
                                   "a mirrored disk template")
2877

    
2878
    target_node = secondary_nodes[0]
2879
    # check memory requirements on the secondary node
2880
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2881
                         instance.name, instance.memory)
2882

    
2883
    # check bridge existance
2884
    brlist = [nic.bridge for nic in instance.nics]
2885
    if not rpc.call_bridges_exist(target_node, brlist):
2886
      raise errors.OpPrereqError("One or more target bridges %s does not"
2887
                                 " exist on destination node '%s'" %
2888
                                 (brlist, target_node))
2889

    
2890
    self.instance = instance
2891

    
2892
  def Exec(self, feedback_fn):
2893
    """Failover an instance.
2894

2895
    The failover is done by shutting it down on its present node and
2896
    starting it on the secondary.
2897

2898
    """
2899
    instance = self.instance
2900

    
2901
    source_node = instance.primary_node
2902
    target_node = instance.secondary_nodes[0]
2903

    
2904
    feedback_fn("* checking disk consistency between source and target")
2905
    for dev in instance.disks:
2906
      # for drbd, these are drbd over lvm
2907
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2908
        if instance.status == "up" and not self.op.ignore_consistency:
2909
          raise errors.OpExecError("Disk %s is degraded on target node,"
2910
                                   " aborting failover." % dev.iv_name)
2911

    
2912
    feedback_fn("* shutting down instance on source node")
2913
    logger.Info("Shutting down instance %s on node %s" %
2914
                (instance.name, source_node))
2915

    
2916
    if not rpc.call_instance_shutdown(source_node, instance):
2917
      if self.op.ignore_consistency:
2918
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2919
                     " anyway. Please make sure node %s is down"  %
2920
                     (instance.name, source_node, source_node))
2921
      else:
2922
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2923
                                 (instance.name, source_node))
2924

    
2925
    feedback_fn("* deactivating the instance's disks on source node")
2926
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2927
      raise errors.OpExecError("Can't shut down the instance's disks.")
2928

    
2929
    instance.primary_node = target_node
2930
    # distribute new instance config to the other nodes
2931
    self.cfg.AddInstance(instance)
2932

    
2933
    # Only start the instance if it's marked as up
2934
    if instance.status == "up":
2935
      feedback_fn("* activating the instance's disks on target node")
2936
      logger.Info("Starting instance %s on node %s" %
2937
                  (instance.name, target_node))
2938

    
2939
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2940
                                               ignore_secondaries=True)
2941
      if not disks_ok:
2942
        _ShutdownInstanceDisks(instance, self.cfg)
2943
        raise errors.OpExecError("Can't activate the instance's disks")
2944

    
2945
      feedback_fn("* starting the instance on the target node")
2946
      if not rpc.call_instance_start(target_node, instance, None):
2947
        _ShutdownInstanceDisks(instance, self.cfg)
2948
        raise errors.OpExecError("Could not start instance %s on node %s." %
2949
                                 (instance.name, target_node))
2950

    
2951

    
2952
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2953
  """Create a tree of block devices on the primary node.
2954

2955
  This always creates all devices.
2956

2957
  """
2958
  if device.children:
2959
    for child in device.children:
2960
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2961
        return False
2962

    
2963
  cfg.SetDiskID(device, node)
2964
  new_id = rpc.call_blockdev_create(node, device, device.size,
2965
                                    instance.name, True, info)
2966
  if not new_id:
2967
    return False
2968
  if device.physical_id is None:
2969
    device.physical_id = new_id
2970
  return True
2971

    
2972

    
2973
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2974
  """Create a tree of block devices on a secondary node.
2975

2976
  If this device type has to be created on secondaries, create it and
2977
  all its children.
2978

2979
  If not, just recurse to children keeping the same 'force' value.
2980

2981
  """
2982
  if device.CreateOnSecondary():
2983
    force = True
2984
  if device.children:
2985
    for child in device.children:
2986
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2987
                                        child, force, info):
2988
        return False
2989

    
2990
  if not force:
2991
    return True
2992
  cfg.SetDiskID(device, node)
2993
  new_id = rpc.call_blockdev_create(node, device, device.size,
2994
                                    instance.name, False, info)
2995
  if not new_id:
2996
    return False
2997
  if device.physical_id is None:
2998
    device.physical_id = new_id
2999
  return True
3000

    
3001

    
3002
def _GenerateUniqueNames(cfg, exts):
3003
  """Generate a suitable LV name.
3004

3005
  This will generate a logical volume name for the given instance.
3006

3007
  """
3008
  results = []
3009
  for val in exts:
3010
    new_id = cfg.GenerateUniqueID()
3011
    results.append("%s%s" % (new_id, val))
3012
  return results
3013

    
3014

    
3015
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3016
  """Generate a drbd device complete with its children.
3017

3018
  """
3019
  port = cfg.AllocatePort()
3020
  vgname = cfg.GetVGName()
3021
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3022
                          logical_id=(vgname, names[0]))
3023
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3024
                          logical_id=(vgname, names[1]))
3025
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3026
                          logical_id = (primary, secondary, port),
3027
                          children = [dev_data, dev_meta])
3028
  return drbd_dev
3029

    
3030

    
3031
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3032
  """Generate a drbd8 device complete with its children.
3033

3034
  """
3035
  port = cfg.AllocatePort()
3036
  vgname = cfg.GetVGName()
3037
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3038
                          logical_id=(vgname, names[0]))
3039
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3040
                          logical_id=(vgname, names[1]))
3041
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3042
                          logical_id = (primary, secondary, port),
3043
                          children = [dev_data, dev_meta],
3044
                          iv_name=iv_name)
3045
  return drbd_dev
3046

    
3047

    
3048
def _GenerateDiskTemplate(cfg, template_name,
3049
                          instance_name, primary_node,
3050
                          secondary_nodes, disk_sz, swap_sz,
3051
                          file_storage_dir, file_driver):
3052
  """Generate the entire disk layout for a given template type.
3053

3054
  """
3055
  #TODO: compute space requirements
3056

    
3057
  vgname = cfg.GetVGName()
3058
  if template_name == constants.DT_DISKLESS:
3059
    disks = []
3060
  elif template_name == constants.DT_PLAIN:
3061
    if len(secondary_nodes) != 0:
3062
      raise errors.ProgrammerError("Wrong template configuration")
3063

    
3064
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3065
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3066
                           logical_id=(vgname, names[0]),
3067
                           iv_name = "sda")
3068
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3069
                           logical_id=(vgname, names[1]),
3070
                           iv_name = "sdb")
3071
    disks = [sda_dev, sdb_dev]
3072
  elif template_name == constants.DT_DRBD8:
3073
    if len(secondary_nodes) != 1:
3074
      raise errors.ProgrammerError("Wrong template configuration")
3075
    remote_node = secondary_nodes[0]
3076
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3077
                                       ".sdb_data", ".sdb_meta"])
3078
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3079
                                         disk_sz, names[0:2], "sda")
3080
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3081
                                         swap_sz, names[2:4], "sdb")
3082
    disks = [drbd_sda_dev, drbd_sdb_dev]
3083
  elif template_name == constants.DT_FILE:
3084
    if len(secondary_nodes) != 0:
3085
      raise errors.ProgrammerError("Wrong template configuration")
3086

    
3087
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3088
                                iv_name="sda", logical_id=(file_driver,
3089
                                "%s/sda" % file_storage_dir))
3090
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3091
                                iv_name="sdb", logical_id=(file_driver,
3092
                                "%s/sdb" % file_storage_dir))
3093
    disks = [file_sda_dev, file_sdb_dev]
3094
  else:
3095
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3096
  return disks
3097

    
3098

    
3099
def _GetInstanceInfoText(instance):
3100
  """Compute that text that should be added to the disk's metadata.
3101

3102
  """
3103
  return "originstname+%s" % instance.name
3104

    
3105

    
3106
def _CreateDisks(cfg, instance):
3107
  """Create all disks for an instance.
3108

3109
  This abstracts away some work from AddInstance.
3110

3111
  Args:
3112
    instance: the instance object
3113

3114
  Returns:
3115
    True or False showing the success of the creation process
3116

3117
  """
3118
  info = _GetInstanceInfoText(instance)
3119

    
3120
  if instance.disk_template == constants.DT_FILE:
3121
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3122
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3123
                                              file_storage_dir)
3124

    
3125
    if not result:
3126
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3127
      return False
3128

    
3129
    if not result[0]:
3130
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3131
      return False
3132

    
3133
  for device in instance.disks:
3134
    logger.Info("creating volume %s for instance %s" %
3135
                (device.iv_name, instance.name))
3136
    #HARDCODE
3137
    for secondary_node in instance.secondary_nodes:
3138
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3139
                                        device, False, info):
3140
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3141
                     (device.iv_name, device, secondary_node))
3142
        return False
3143
    #HARDCODE
3144
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3145
                                    instance, device, info):
3146
      logger.Error("failed to create volume %s on primary!" %
3147
                   device.iv_name)
3148
      return False
3149

    
3150
  return True
3151

    
3152

    
3153
def _RemoveDisks(instance, cfg):
3154
  """Remove all disks for an instance.
3155

3156
  This abstracts away some work from `AddInstance()` and
3157
  `RemoveInstance()`. Note that in case some of the devices couldn't
3158
  be removed, the removal will continue with the other ones (compare
3159
  with `_CreateDisks()`).
3160

3161
  Args:
3162
    instance: the instance object
3163

3164
  Returns:
3165
    True or False showing the success of the removal proces
3166

3167
  """
3168
  logger.Info("removing block devices for instance %s" % instance.name)
3169

    
3170
  result = True
3171
  for device in instance.disks:
3172
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3173
      cfg.SetDiskID(disk, node)
3174
      if not rpc.call_blockdev_remove(node, disk):
3175
        logger.Error("could not remove block device %s on node %s,"
3176
                     " continuing anyway" %
3177
                     (device.iv_name, node))
3178
        result = False
3179

    
3180
  if instance.disk_template == constants.DT_FILE:
3181
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3182
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3183
                                            file_storage_dir):
3184
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3185
      result = False
3186

    
3187
  return result
3188

    
3189

    
3190
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3191
  """Compute disk size requirements in the volume group
3192

3193
  This is currently hard-coded for the two-drive layout.
3194

3195
  """
3196
  # Required free disk space as a function of disk and swap space
3197
  req_size_dict = {
3198
    constants.DT_DISKLESS: None,
3199
    constants.DT_PLAIN: disk_size + swap_size,
3200
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3201
    constants.DT_DRBD8: disk_size + swap_size + 256,
3202
    constants.DT_FILE: None,
3203
  }
3204

    
3205
  if disk_template not in req_size_dict:
3206
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3207
                                 " is unknown" %  disk_template)
3208

    
3209
  return req_size_dict[disk_template]
3210

    
3211

    
3212
class LUCreateInstance(LogicalUnit):
3213
  """Create an instance.
3214

3215
  """
3216
  HPATH = "instance-add"
3217
  HTYPE = constants.HTYPE_INSTANCE
3218
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3219
              "disk_template", "swap_size", "mode", "start", "vcpus",
3220
              "wait_for_sync", "ip_check", "mac"]
3221

    
3222
  def _RunAllocator(self):
3223
    """Run the allocator based on input opcode.
3224

3225
    """
3226
    disks = [{"size": self.op.disk_size, "mode": "w"},
3227
             {"size": self.op.swap_size, "mode": "w"}]
3228
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3229
             "bridge": self.op.bridge}]
3230
    ial = IAllocator(self.cfg, self.sstore,
3231
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3232
                     name=self.op.instance_name,
3233
                     disk_template=self.op.disk_template,
3234
                     tags=[],
3235
                     os=self.op.os_type,
3236
                     vcpus=self.op.vcpus,
3237
                     mem_size=self.op.mem_size,
3238
                     disks=disks,
3239
                     nics=nics,
3240
                     )
3241

    
3242
    ial.Run(self.op.iallocator)
3243

    
3244
    if not ial.success:
3245
      raise errors.OpPrereqError("Can't compute nodes using"
3246
                                 " iallocator '%s': %s" % (self.op.iallocator,
3247
                                                           ial.info))
3248
    if len(ial.nodes) != ial.required_nodes:
3249
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3250
                                 " of nodes (%s), required %s" %
3251
                                 (len(ial.nodes), ial.required_nodes))
3252
    self.op.pnode = ial.nodes[0]
3253
    logger.ToStdout("Selected nodes for the instance: %s" %
3254
                    (", ".join(ial.nodes),))
3255
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3256
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3257
    if ial.required_nodes == 2:
3258
      self.op.snode = ial.nodes[1]
3259

    
3260
  def BuildHooksEnv(self):
3261
    """Build hooks env.
3262

3263
    This runs on master, primary and secondary nodes of the instance.
3264

3265
    """
3266
    env = {
3267
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3268
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3269
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3270
      "INSTANCE_ADD_MODE": self.op.mode,
3271
      }
3272
    if self.op.mode == constants.INSTANCE_IMPORT:
3273
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3274
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3275
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3276

    
3277
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3278
      primary_node=self.op.pnode,
3279
      secondary_nodes=self.secondaries,
3280
      status=self.instance_status,
3281
      os_type=self.op.os_type,
3282
      memory=self.op.mem_size,
3283
      vcpus=self.op.vcpus,
3284
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3285
    ))
3286

    
3287
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3288
          self.secondaries)
3289
    return env, nl, nl
3290

    
3291

    
3292
  def CheckPrereq(self):
3293
    """Check prerequisites.
3294

3295
    """
3296
    # set optional parameters to none if they don't exist
3297
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3298
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3299
                 "vnc_bind_address"]:
3300
      if not hasattr(self.op, attr):
3301
        setattr(self.op, attr, None)
3302

    
3303
    if self.op.mode not in (constants.INSTANCE_CREATE,
3304
                            constants.INSTANCE_IMPORT):
3305
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3306
                                 self.op.mode)
3307

    
3308
    if (not self.cfg.GetVGName() and
3309
        self.op.disk_template not in constants.DTS_NOT_LVM):
3310
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3311
                                 " instances")
3312

    
3313
    if self.op.mode == constants.INSTANCE_IMPORT:
3314
      src_node = getattr(self.op, "src_node", None)
3315
      src_path = getattr(self.op, "src_path", None)
3316
      if src_node is None or src_path is None:
3317
        raise errors.OpPrereqError("Importing an instance requires source"
3318
                                   " node and path options")
3319
      src_node_full = self.cfg.ExpandNodeName(src_node)
3320
      if src_node_full is None:
3321
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3322
      self.op.src_node = src_node = src_node_full
3323

    
3324
      if not os.path.isabs(src_path):
3325
        raise errors.OpPrereqError("The source path must be absolute")
3326

    
3327
      export_info = rpc.call_export_info(src_node, src_path)
3328

    
3329
      if not export_info:
3330
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3331

    
3332
      if not export_info.has_section(constants.INISECT_EXP):
3333
        raise errors.ProgrammerError("Corrupted export config")
3334

    
3335
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3336
      if (int(ei_version) != constants.EXPORT_VERSION):
3337
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3338
                                   (ei_version, constants.EXPORT_VERSION))
3339

    
3340
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3341
        raise errors.OpPrereqError("Can't import instance with more than"
3342
                                   " one data disk")
3343

    
3344
      # FIXME: are the old os-es, disk sizes, etc. useful?
3345
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3346
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3347
                                                         'disk0_dump'))
3348
      self.src_image = diskimage
3349
    else: # INSTANCE_CREATE
3350
      if getattr(self.op, "os_type", None) is None:
3351
        raise errors.OpPrereqError("No guest OS specified")
3352

    
3353
    #### instance parameters check
3354

    
3355
    # disk template and mirror node verification
3356
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3357
      raise errors.OpPrereqError("Invalid disk template name")
3358

    
3359
    # instance name verification
3360
    hostname1 = utils.HostInfo(self.op.instance_name)
3361

    
3362
    self.op.instance_name = instance_name = hostname1.name
3363
    instance_list = self.cfg.GetInstanceList()
3364
    if instance_name in instance_list:
3365
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3366
                                 instance_name)
3367

    
3368
    # ip validity checks
3369
    ip = getattr(self.op, "ip", None)
3370
    if ip is None or ip.lower() == "none":
3371
      inst_ip = None
3372
    elif ip.lower() == "auto":
3373
      inst_ip = hostname1.ip
3374
    else:
3375
      if not utils.IsValidIP(ip):
3376
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3377
                                   " like a valid IP" % ip)
3378
      inst_ip = ip
3379
    self.inst_ip = self.op.ip = inst_ip
3380

    
3381
    if self.op.start and not self.op.ip_check:
3382
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3383
                                 " adding an instance in start mode")
3384

    
3385
    if self.op.ip_check:
3386
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3387
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3388
                                   (hostname1.ip, instance_name))
3389

    
3390
    # MAC address verification
3391
    if self.op.mac != "auto":
3392
      if not utils.IsValidMac(self.op.mac.lower()):
3393
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3394
                                   self.op.mac)
3395

    
3396
    # bridge verification
3397
    bridge = getattr(self.op, "bridge", None)
3398
    if bridge is None:
3399
      self.op.bridge = self.cfg.GetDefBridge()
3400
    else:
3401
      self.op.bridge = bridge
3402

    
3403
    # boot order verification
3404
    if self.op.hvm_boot_order is not None:
3405
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3406
        raise errors.OpPrereqError("invalid boot order specified,"
3407
                                   " must be one or more of [acdn]")
3408
    # file storage checks
3409
    if (self.op.file_driver and
3410
        not self.op.file_driver in constants.FILE_DRIVER):
3411
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3412
                                 self.op.file_driver)
3413

    
3414
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3415
      raise errors.OpPrereqError("File storage directory not a relative"
3416
                                 " path")
3417
    #### allocator run
3418

    
3419
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3420
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3421
                                 " node must be given")
3422

    
3423
    if self.op.iallocator is not None:
3424
      self._RunAllocator()
3425

    
3426
    #### node related checks
3427

    
3428
    # check primary node
3429
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3430
    if pnode is None:
3431
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3432
                                 self.op.pnode)
3433
    self.op.pnode = pnode.name
3434
    self.pnode = pnode
3435
    self.secondaries = []
3436

    
3437
    # mirror node verification
3438
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3439
      if getattr(self.op, "snode", None) is None:
3440
        raise errors.OpPrereqError("The networked disk templates need"
3441
                                   " a mirror node")
3442

    
3443
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3444
      if snode_name is None:
3445
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3446
                                   self.op.snode)
3447
      elif snode_name == pnode.name:
3448
        raise errors.OpPrereqError("The secondary node cannot be"
3449
                                   " the primary node.")
3450
      self.secondaries.append(snode_name)
3451

    
3452
    req_size = _ComputeDiskSize(self.op.disk_template,
3453
                                self.op.disk_size, self.op.swap_size)
3454

    
3455
    # Check lv size requirements
3456
    if req_size is not None:
3457
      nodenames = [pnode.name] + self.secondaries
3458
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3459
      for node in nodenames:
3460
        info = nodeinfo.get(node, None)
3461
        if not info:
3462
          raise errors.OpPrereqError("Cannot get current information"
3463
                                     " from node '%s'" % nodeinfo)
3464
        vg_free = info.get('vg_free', None)
3465
        if not isinstance(vg_free, int):
3466
          raise errors.OpPrereqError("Can't compute free disk space on"
3467
                                     " node %s" % node)
3468
        if req_size > info['vg_free']:
3469
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3470
                                     " %d MB available, %d MB required" %
3471
                                     (node, info['vg_free'], req_size))
3472

    
3473
    # os verification
3474
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3475
    if not os_obj:
3476
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3477
                                 " primary node"  % self.op.os_type)
3478

    
3479
    if self.op.kernel_path == constants.VALUE_NONE:
3480
      raise errors.OpPrereqError("Can't set instance kernel to none")
3481

    
3482

    
3483
    # bridge check on primary node
3484
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3485
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3486
                                 " destination node '%s'" %
3487
                                 (self.op.bridge, pnode.name))
3488

    
3489
    # memory check on primary node
3490
    if self.op.start:
3491
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3492
                           "creating instance %s" % self.op.instance_name,
3493
                           self.op.mem_size)
3494

    
3495
    # hvm_cdrom_image_path verification
3496
    if self.op.hvm_cdrom_image_path is not None:
3497
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3498
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3499
                                   " be an absolute path or None, not %s" %
3500
                                   self.op.hvm_cdrom_image_path)
3501
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3502
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3503
                                   " regular file or a symlink pointing to"
3504
                                   " an existing regular file, not %s" %
3505
                                   self.op.hvm_cdrom_image_path)
3506

    
3507
    # vnc_bind_address verification
3508
    if self.op.vnc_bind_address is not None:
3509
      if not utils.IsValidIP(self.op.vnc_bind_address):
3510
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3511
                                   " like a valid IP address" %
3512
                                   self.op.vnc_bind_address)
3513

    
3514
    if self.op.start:
3515
      self.instance_status = 'up'
3516
    else:
3517
      self.instance_status = 'down'
3518

    
3519
  def Exec(self, feedback_fn):
3520
    """Create and add the instance to the cluster.
3521

3522
    """
3523
    instance = self.op.instance_name
3524
    pnode_name = self.pnode.name
3525

    
3526
    if self.op.mac == "auto":
3527
      mac_address = self.cfg.GenerateMAC()
3528
    else:
3529
      mac_address = self.op.mac
3530

    
3531
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3532
    if self.inst_ip is not None:
3533
      nic.ip = self.inst_ip
3534

    
3535
    ht_kind = self.sstore.GetHypervisorType()
3536
    if ht_kind in constants.HTS_REQ_PORT:
3537
      network_port = self.cfg.AllocatePort()
3538
    else:
3539
      network_port = None
3540

    
3541
    if self.op.vnc_bind_address is None:
3542
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3543

    
3544
    # this is needed because os.path.join does not accept None arguments
3545
    if self.op.file_storage_dir is None:
3546
      string_file_storage_dir = ""
3547
    else:
3548
      string_file_storage_dir = self.op.file_storage_dir
3549

    
3550
    # build the full file storage dir path
3551
    file_storage_dir = os.path.normpath(os.path.join(
3552
                                        self.sstore.GetFileStorageDir(),
3553
                                        string_file_storage_dir, instance))
3554

    
3555

    
3556
    disks = _GenerateDiskTemplate(self.cfg,
3557
                                  self.op.disk_template,
3558
                                  instance, pnode_name,
3559
                                  self.secondaries, self.op.disk_size,
3560
                                  self.op.swap_size,
3561
                                  file_storage_dir,
3562
                                  self.op.file_driver)
3563

    
3564
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3565
                            primary_node=pnode_name,
3566
                            memory=self.op.mem_size,
3567
                            vcpus=self.op.vcpus,
3568
                            nics=[nic], disks=disks,
3569
                            disk_template=self.op.disk_template,
3570
                            status=self.instance_status,
3571
                            network_port=network_port,
3572
                            kernel_path=self.op.kernel_path,
3573
                            initrd_path=self.op.initrd_path,
3574
                            hvm_boot_order=self.op.hvm_boot_order,
3575
                            hvm_acpi=self.op.hvm_acpi,
3576
                            hvm_pae=self.op.hvm_pae,
3577
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3578
                            vnc_bind_address=self.op.vnc_bind_address,
3579
                            )
3580

    
3581
    feedback_fn("* creating instance disks...")
3582
    if not _CreateDisks(self.cfg, iobj):
3583
      _RemoveDisks(iobj, self.cfg)
3584
      raise errors.OpExecError("Device creation failed, reverting...")
3585

    
3586
    feedback_fn("adding instance %s to cluster config" % instance)
3587

    
3588
    self.cfg.AddInstance(iobj)
3589

    
3590
    if self.op.wait_for_sync:
3591
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3592
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3593
      # make sure the disks are not degraded (still sync-ing is ok)
3594
      time.sleep(15)
3595
      feedback_fn("* checking mirrors status")
3596
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3597
    else:
3598
      disk_abort = False
3599

    
3600
    if disk_abort:
3601
      _RemoveDisks(iobj, self.cfg)
3602
      self.cfg.RemoveInstance(iobj.name)
3603
      raise errors.OpExecError("There are some degraded disks for"
3604
                               " this instance")
3605

    
3606
    feedback_fn("creating os for instance %s on node %s" %
3607
                (instance, pnode_name))
3608

    
3609
    if iobj.disk_template != constants.DT_DISKLESS:
3610
      if self.op.mode == constants.INSTANCE_CREATE:
3611
        feedback_fn("* running the instance OS create scripts...")
3612
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3613
          raise errors.OpExecError("could not add os for instance %s"
3614
                                   " on node %s" %
3615
                                   (instance, pnode_name))
3616

    
3617
      elif self.op.mode == constants.INSTANCE_IMPORT:
3618
        feedback_fn("* running the instance OS import scripts...")
3619
        src_node = self.op.src_node
3620
        src_image = self.src_image
3621
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3622
                                                src_node, src_image):
3623
          raise errors.OpExecError("Could not import os for instance"
3624
                                   " %s on node %s" %
3625
                                   (instance, pnode_name))
3626
      else:
3627
        # also checked in the prereq part
3628
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3629
                                     % self.op.mode)
3630

    
3631
    if self.op.start:
3632
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3633
      feedback_fn("* starting instance...")
3634
      if not rpc.call_instance_start(pnode_name, iobj, None):
3635
        raise errors.OpExecError("Could not start instance")
3636

    
3637

    
3638
class LUConnectConsole(NoHooksLU):
3639
  """Connect to an instance's console.
3640

3641
  This is somewhat special in that it returns the command line that
3642
  you need to run on the master node in order to connect to the
3643
  console.
3644

3645
  """
3646
  _OP_REQP = ["instance_name"]
3647

    
3648
  def CheckPrereq(self):
3649
    """Check prerequisites.
3650

3651
    This checks that the instance is in the cluster.
3652

3653
    """
3654
    instance = self.cfg.GetInstanceInfo(
3655
      self.cfg.ExpandInstanceName(self.op.instance_name))
3656
    if instance is None:
3657
      raise errors.OpPrereqError("Instance '%s' not known" %
3658
                                 self.op.instance_name)
3659
    self.instance = instance
3660

    
3661
  def Exec(self, feedback_fn):
3662
    """Connect to the console of an instance
3663

3664
    """
3665
    instance = self.instance
3666
    node = instance.primary_node
3667

    
3668
    node_insts = rpc.call_instance_list([node])[node]
3669
    if node_insts is False:
3670
      raise errors.OpExecError("Can't connect to node %s." % node)
3671

    
3672
    if instance.name not in node_insts:
3673
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3674

    
3675
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3676

    
3677
    hyper = hypervisor.GetHypervisor()
3678
    console_cmd = hyper.GetShellCommandForConsole(instance)
3679

    
3680
    # build ssh cmdline
3681
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3682

    
3683

    
3684
class LUReplaceDisks(LogicalUnit):
3685
  """Replace the disks of an instance.
3686

3687
  """
3688
  HPATH = "mirrors-replace"
3689
  HTYPE = constants.HTYPE_INSTANCE
3690
  _OP_REQP = ["instance_name", "mode", "disks"]
3691

    
3692
  def _RunAllocator(self):
3693
    """Compute a new secondary node using an IAllocator.
3694

3695
    """
3696
    ial = IAllocator(self.cfg, self.sstore,
3697
                     mode=constants.IALLOCATOR_MODE_RELOC,
3698
                     name=self.op.instance_name,
3699
                     relocate_from=[self.sec_node])
3700

    
3701
    ial.Run(self.op.iallocator)
3702

    
3703
    if not ial.success:
3704
      raise errors.OpPrereqError("Can't compute nodes using"
3705
                                 " iallocator '%s': %s" % (self.op.iallocator,
3706
                                                           ial.info))
3707
    if len(ial.nodes) != ial.required_nodes:
3708
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3709
                                 " of nodes (%s), required %s" %
3710
                                 (len(ial.nodes), ial.required_nodes))
3711
    self.op.remote_node = ial.nodes[0]
3712
    logger.ToStdout("Selected new secondary for the instance: %s" %
3713
                    self.op.remote_node)
3714

    
3715
  def BuildHooksEnv(self):
3716
    """Build hooks env.
3717

3718
    This runs on the master, the primary and all the secondaries.
3719

3720
    """
3721
    env = {
3722
      "MODE": self.op.mode,
3723
      "NEW_SECONDARY": self.op.remote_node,
3724
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3725
      }
3726
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3727
    nl = [
3728
      self.sstore.GetMasterNode(),
3729
      self.instance.primary_node,
3730
      ]
3731
    if self.op.remote_node is not None:
3732
      nl.append(self.op.remote_node)
3733
    return env, nl, nl
3734

    
3735
  def CheckPrereq(self):
3736
    """Check prerequisites.
3737

3738
    This checks that the instance is in the cluster.
3739

3740
    """
3741
    if not hasattr(self.op, "remote_node"):
3742
      self.op.remote_node = None
3743

    
3744
    instance = self.cfg.GetInstanceInfo(
3745
      self.cfg.ExpandInstanceName(self.op.instance_name))
3746
    if instance is None:
3747
      raise errors.OpPrereqError("Instance '%s' not known" %
3748
                                 self.op.instance_name)
3749
    self.instance = instance
3750
    self.op.instance_name = instance.name
3751

    
3752
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3753
      raise errors.OpPrereqError("Instance's disk layout is not"
3754
                                 " network mirrored.")
3755

    
3756
    if len(instance.secondary_nodes) != 1:
3757
      raise errors.OpPrereqError("The instance has a strange layout,"
3758
                                 " expected one secondary but found %d" %
3759
                                 len(instance.secondary_nodes))
3760

    
3761
    self.sec_node = instance.secondary_nodes[0]
3762

    
3763
    ia_name = getattr(self.op, "iallocator", None)
3764
    if ia_name is not None:
3765
      if self.op.remote_node is not None:
3766
        raise errors.OpPrereqError("Give either the iallocator or the new"
3767
                                   " secondary, not both")
3768
      self.op.remote_node = self._RunAllocator()
3769

    
3770
    remote_node = self.op.remote_node
3771
    if remote_node is not None:
3772
      remote_node = self.cfg.ExpandNodeName(remote_node)
3773
      if remote_node is None:
3774
        raise errors.OpPrereqError("Node '%s' not known" %
3775
                                   self.op.remote_node)
3776
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3777
    else:
3778
      self.remote_node_info = None
3779
    if remote_node == instance.primary_node:
3780
      raise errors.OpPrereqError("The specified node is the primary node of"
3781
                                 " the instance.")
3782
    elif remote_node == self.sec_node:
3783
      if self.op.mode == constants.REPLACE_DISK_SEC:
3784
        # this is for DRBD8, where we can't execute the same mode of
3785
        # replacement as for drbd7 (no different port allocated)
3786
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3787
                                   " replacement")
3788
    if instance.disk_template == constants.DT_DRBD8:
3789
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3790
          remote_node is not None):
3791
        # switch to replace secondary mode
3792
        self.op.mode = constants.REPLACE_DISK_SEC
3793

    
3794
      if self.op.mode == constants.REPLACE_DISK_ALL:
3795
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3796
                                   " secondary disk replacement, not"
3797
                                   " both at once")
3798
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3799
        if remote_node is not None:
3800
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3801
                                     " the secondary while doing a primary"
3802
                                     " node disk replacement")
3803
        self.tgt_node = instance.primary_node
3804
        self.oth_node = instance.secondary_nodes[0]
3805
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3806
        self.new_node = remote_node # this can be None, in which case
3807
                                    # we don't change the secondary
3808
        self.tgt_node = instance.secondary_nodes[0]
3809
        self.oth_node = instance.primary_node
3810
      else:
3811
        raise errors.ProgrammerError("Unhandled disk replace mode")
3812

    
3813
    for name in self.op.disks:
3814
      if instance.FindDisk(name) is None:
3815
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3816
                                   (name, instance.name))
3817
    self.op.remote_node = remote_node
3818

    
3819
  def _ExecD8DiskOnly(self, feedback_fn):
3820
    """Replace a disk on the primary or secondary for dbrd8.
3821

3822
    The algorithm for replace is quite complicated:
3823
      - for each disk to be replaced:
3824
        - create new LVs on the target node with unique names
3825
        - detach old LVs from the drbd device
3826
        - rename old LVs to name_replaced.<time_t>
3827
        - rename new LVs to old LVs
3828
        - attach the new LVs (with the old names now) to the drbd device
3829
      - wait for sync across all devices
3830
      - for each modified disk:
3831
        - remove old LVs (which have the name name_replaces.<time_t>)
3832

3833
    Failures are not very well handled.
3834

3835
    """
3836
    steps_total = 6
3837
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3838
    instance = self.instance
3839
    iv_names = {}
3840
    vgname = self.cfg.GetVGName()
3841
    # start of work
3842
    cfg = self.cfg
3843
    tgt_node = self.tgt_node
3844
    oth_node = self.oth_node
3845

    
3846
    # Step: check device activation
3847
    self.proc.LogStep(1, steps_total, "check device existence")
3848
    info("checking volume groups")
3849
    my_vg = cfg.GetVGName()
3850
    results = rpc.call_vg_list([oth_node, tgt_node])
3851
    if not results:
3852
      raise errors.OpExecError("Can't list volume groups on the nodes")
3853
    for node in oth_node, tgt_node:
3854
      res = results.get(node, False)
3855
      if not res or my_vg not in res:
3856
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3857
                                 (my_vg, node))
3858
    for dev in instance.disks:
3859
      if not dev.iv_name in self.op.disks:
3860
        continue
3861
      for node in tgt_node, oth_node:
3862
        info("checking %s on %s" % (dev.iv_name, node))
3863
        cfg.SetDiskID(dev, node)
3864
        if not rpc.call_blockdev_find(node, dev):
3865
          raise errors.OpExecError("Can't find device %s on node %s" %
3866
                                   (dev.iv_name, node))
3867

    
3868
    # Step: check other node consistency
3869
    self.proc.LogStep(2, steps_total, "check peer consistency")
3870
    for dev in instance.disks:
3871
      if not dev.iv_name in self.op.disks:
3872
        continue
3873
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3874
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3875
                                   oth_node==instance.primary_node):
3876
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3877
                                 " to replace disks on this node (%s)" %
3878
                                 (oth_node, tgt_node))
3879

    
3880
    # Step: create new storage
3881
    self.proc.LogStep(3, steps_total, "allocate new storage")
3882
    for dev in instance.disks:
3883
      if not dev.iv_name in self.op.disks:
3884
        continue
3885
      size = dev.size
3886
      cfg.SetDiskID(dev, tgt_node)
3887
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3888
      names = _GenerateUniqueNames(cfg, lv_names)
3889
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3890
                             logical_id=(vgname, names[0]))
3891
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3892
                             logical_id=(vgname, names[1]))
3893
      new_lvs = [lv_data, lv_meta]
3894
      old_lvs = dev.children
3895
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3896
      info("creating new local storage on %s for %s" %
3897
           (tgt_node, dev.iv_name))
3898
      # since we *always* want to create this LV, we use the
3899
      # _Create...OnPrimary (which forces the creation), even if we
3900
      # are talking about the secondary node
3901
      for new_lv in new_lvs:
3902
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3903
                                        _GetInstanceInfoText(instance)):
3904
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3905
                                   " node '%s'" %
3906
                                   (new_lv.logical_id[1], tgt_node))
3907

    
3908
    # Step: for each lv, detach+rename*2+attach
3909
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3910
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3911
      info("detaching %s drbd from local storage" % dev.iv_name)
3912
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3913
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3914
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3915
      #dev.children = []
3916
      #cfg.Update(instance)
3917

    
3918
      # ok, we created the new LVs, so now we know we have the needed
3919
      # storage; as such, we proceed on the target node to rename
3920
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3921
      # using the assumption that logical_id == physical_id (which in
3922
      # turn is the unique_id on that node)
3923

    
3924
      # FIXME(iustin): use a better name for the replaced LVs
3925
      temp_suffix = int(time.time())
3926
      ren_fn = lambda d, suff: (d.physical_id[0],
3927
                                d.physical_id[1] + "_replaced-%s" % suff)
3928
      # build the rename list based on what LVs exist on the node
3929
      rlist = []
3930
      for to_ren in old_lvs:
3931
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3932
        if find_res is not None: # device exists
3933
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3934

    
3935
      info("renaming the old LVs on the target node")
3936
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3937
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3938
      # now we rename the new LVs to the old LVs
3939
      info("renaming the new LVs on the target node")
3940
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3941
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3942
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3943

    
3944
      for old, new in zip(old_lvs, new_lvs):
3945
        new.logical_id = old.logical_id
3946
        cfg.SetDiskID(new, tgt_node)
3947

    
3948
      for disk in old_lvs:
3949
        disk.logical_id = ren_fn(disk, temp_suffix)
3950
        cfg.SetDiskID(disk, tgt_node)
3951

    
3952
      # now that the new lvs have the old name, we can add them to the device
3953
      info("adding new mirror component on %s" % tgt_node)
3954
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3955
        for new_lv in new_lvs:
3956
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3957
            warning("Can't rollback device %s", hint="manually cleanup unused"
3958
                    " logical volumes")
3959
        raise errors.OpExecError("Can't add local storage to drbd")
3960

    
3961
      dev.children = new_lvs
3962
      cfg.Update(instance)
3963

    
3964
    # Step: wait for sync
3965

    
3966
    # this can fail as the old devices are degraded and _WaitForSync
3967
    # does a combined result over all disks, so we don't check its
3968
    # return value
3969
    self.proc.LogStep(5, steps_total, "sync devices")
3970
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3971

    
3972
    # so check manually all the devices
3973
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3974
      cfg.SetDiskID(dev, instance.primary_node)
3975
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3976
      if is_degr:
3977
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3978

    
3979
    # Step: remove old storage
3980
    self.proc.LogStep(6, steps_total, "removing old storage")
3981
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3982
      info(