Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 298fe380

History | View | Annotate | Download (163.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import simplejson
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47

    
48
# Check whether the simplejson module supports indentation
49
_JSON_INDENT = 2
50
try:
51
  simplejson.dumps(1, indent=_JSON_INDENT)
52
except TypeError:
53
  _JSON_INDENT = None
54

    
55

    
56
class LogicalUnit(object):
57
  """Logical Unit base class.
58

59
  Subclasses must follow these rules:
60
    - implement CheckPrereq which also fills in the opcode instance
61
      with all the fields (even if as None)
62
    - implement Exec
63
    - implement BuildHooksEnv
64
    - redefine HPATH and HTYPE
65
    - optionally redefine their run requirements (REQ_CLUSTER,
66
      REQ_MASTER); note that all commands require root permissions
67

68
  """
69
  HPATH = None
70
  HTYPE = None
71
  _OP_REQP = []
72
  REQ_CLUSTER = True
73
  REQ_MASTER = True
74

    
75
  def __init__(self, processor, op, cfg, sstore):
76
    """Constructor for LogicalUnit.
77

78
    This needs to be overriden in derived classes in order to check op
79
    validity.
80

81
    """
82
    self.proc = processor
83
    self.op = op
84
    self.cfg = cfg
85
    self.sstore = sstore
86
    self.__ssh = None
87

    
88
    for attr_name in self._OP_REQP:
89
      attr_val = getattr(op, attr_name, None)
90
      if attr_val is None:
91
        raise errors.OpPrereqError("Required parameter '%s' missing" %
92
                                   attr_name)
93
    if self.REQ_CLUSTER:
94
      if not cfg.IsCluster():
95
        raise errors.OpPrereqError("Cluster not initialized yet,"
96
                                   " use 'gnt-cluster init' first.")
97
      if self.REQ_MASTER:
98
        master = sstore.GetMasterNode()
99
        if master != utils.HostInfo().name:
100
          raise errors.OpPrereqError("Commands must be run on the master"
101
                                     " node %s" % master)
102

    
103
  def __GetSSH(self):
104
    """Returns the SshRunner object
105

106
    """
107
    if not self.__ssh:
108
      self.__ssh = ssh.SshRunner(self.sstore)
109
    return self.__ssh
110

    
111
  ssh = property(fget=__GetSSH)
112

    
113
  def CheckPrereq(self):
114
    """Check prerequisites for this LU.
115

116
    This method should check that the prerequisites for the execution
117
    of this LU are fulfilled. It can do internode communication, but
118
    it should be idempotent - no cluster or system changes are
119
    allowed.
120

121
    The method should raise errors.OpPrereqError in case something is
122
    not fulfilled. Its return value is ignored.
123

124
    This method should also update all the parameters of the opcode to
125
    their canonical form; e.g. a short node name must be fully
126
    expanded after this method has successfully completed (so that
127
    hooks, logging, etc. work correctly).
128

129
    """
130
    raise NotImplementedError
131

    
132
  def Exec(self, feedback_fn):
133
    """Execute the LU.
134

135
    This method should implement the actual work. It should raise
136
    errors.OpExecError for failures that are somewhat dealt with in
137
    code, or expected.
138

139
    """
140
    raise NotImplementedError
141

    
142
  def BuildHooksEnv(self):
143
    """Build hooks environment for this LU.
144

145
    This method should return a three-node tuple consisting of: a dict
146
    containing the environment that will be used for running the
147
    specific hook for this LU, a list of node names on which the hook
148
    should run before the execution, and a list of node names on which
149
    the hook should run after the execution.
150

151
    The keys of the dict must not have 'GANETI_' prefixed as this will
152
    be handled in the hooks runner. Also note additional keys will be
153
    added by the hooks runner. If the LU doesn't define any
154
    environment, an empty dict (and not None) should be returned.
155

156
    As for the node lists, the master should not be included in the
157
    them, as it will be added by the hooks runner in case this LU
158
    requires a cluster to run on (otherwise we don't have a node
159
    list). No nodes should be returned as an empty list (and not
160
    None).
161

162
    Note that if the HPATH for a LU class is None, this function will
163
    not be called.
164

165
    """
166
    raise NotImplementedError
167

    
168

    
169
class NoHooksLU(LogicalUnit):
170
  """Simple LU which runs no hooks.
171

172
  This LU is intended as a parent for other LogicalUnits which will
173
  run no hooks, in order to reduce duplicate code.
174

175
  """
176
  HPATH = None
177
  HTYPE = None
178

    
179
  def BuildHooksEnv(self):
180
    """Build hooks env.
181

182
    This is a no-op, since we don't run hooks.
183

184
    """
185
    return {}, [], []
186

    
187

    
188
def _AddHostToEtcHosts(hostname):
189
  """Wrapper around utils.SetEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
194

    
195

    
196
def _RemoveHostFromEtcHosts(hostname):
197
  """Wrapper around utils.RemoveEtcHostsEntry.
198

199
  """
200
  hi = utils.HostInfo(name=hostname)
201
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
202
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
203

    
204

    
205
def _GetWantedNodes(lu, nodes):
206
  """Returns list of checked and expanded node names.
207

208
  Args:
209
    nodes: List of nodes (strings) or None for all
210

211
  """
212
  if not isinstance(nodes, list):
213
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
214

    
215
  if nodes:
216
    wanted = []
217

    
218
    for name in nodes:
219
      node = lu.cfg.ExpandNodeName(name)
220
      if node is None:
221
        raise errors.OpPrereqError("No such node name '%s'" % name)
222
      wanted.append(node)
223

    
224
  else:
225
    wanted = lu.cfg.GetNodeList()
226
  return utils.NiceSort(wanted)
227

    
228

    
229
def _GetWantedInstances(lu, instances):
230
  """Returns list of checked and expanded instance names.
231

232
  Args:
233
    instances: List of instances (strings) or None for all
234

235
  """
236
  if not isinstance(instances, list):
237
    raise errors.OpPrereqError("Invalid argument type 'instances'")
238

    
239
  if instances:
240
    wanted = []
241

    
242
    for name in instances:
243
      instance = lu.cfg.ExpandInstanceName(name)
244
      if instance is None:
245
        raise errors.OpPrereqError("No such instance name '%s'" % name)
246
      wanted.append(instance)
247

    
248
  else:
249
    wanted = lu.cfg.GetInstanceList()
250
  return utils.NiceSort(wanted)
251

    
252

    
253
def _CheckOutputFields(static, dynamic, selected):
254
  """Checks whether all selected fields are valid.
255

256
  Args:
257
    static: Static fields
258
    dynamic: Dynamic fields
259

260
  """
261
  static_fields = frozenset(static)
262
  dynamic_fields = frozenset(dynamic)
263

    
264
  all_fields = static_fields | dynamic_fields
265

    
266
  if not all_fields.issuperset(selected):
267
    raise errors.OpPrereqError("Unknown output fields selected: %s"
268
                               % ",".join(frozenset(selected).
269
                                          difference(all_fields)))
270

    
271

    
272
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
273
                          memory, vcpus, nics):
274
  """Builds instance related env variables for hooks from single variables.
275

276
  Args:
277
    secondary_nodes: List of secondary nodes as strings
278
  """
279
  env = {
280
    "OP_TARGET": name,
281
    "INSTANCE_NAME": name,
282
    "INSTANCE_PRIMARY": primary_node,
283
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
284
    "INSTANCE_OS_TYPE": os_type,
285
    "INSTANCE_STATUS": status,
286
    "INSTANCE_MEMORY": memory,
287
    "INSTANCE_VCPUS": vcpus,
288
  }
289

    
290
  if nics:
291
    nic_count = len(nics)
292
    for idx, (ip, bridge, mac) in enumerate(nics):
293
      if ip is None:
294
        ip = ""
295
      env["INSTANCE_NIC%d_IP" % idx] = ip
296
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
297
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
298
  else:
299
    nic_count = 0
300

    
301
  env["INSTANCE_NIC_COUNT"] = nic_count
302

    
303
  return env
304

    
305

    
306
def _BuildInstanceHookEnvByObject(instance, override=None):
307
  """Builds instance related env variables for hooks from an object.
308

309
  Args:
310
    instance: objects.Instance object of instance
311
    override: dict of values to override
312
  """
313
  args = {
314
    'name': instance.name,
315
    'primary_node': instance.primary_node,
316
    'secondary_nodes': instance.secondary_nodes,
317
    'os_type': instance.os,
318
    'status': instance.os,
319
    'memory': instance.memory,
320
    'vcpus': instance.vcpus,
321
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
322
  }
323
  if override:
324
    args.update(override)
325
  return _BuildInstanceHookEnv(**args)
326

    
327

    
328
def _HasValidVG(vglist, vgname):
329
  """Checks if the volume group list is valid.
330

331
  A non-None return value means there's an error, and the return value
332
  is the error message.
333

334
  """
335
  vgsize = vglist.get(vgname, None)
336
  if vgsize is None:
337
    return "volume group '%s' missing" % vgname
338
  elif vgsize < 20480:
339
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
340
            (vgname, vgsize))
341
  return None
342

    
343

    
344
def _InitSSHSetup(node):
345
  """Setup the SSH configuration for the cluster.
346

347

348
  This generates a dsa keypair for root, adds the pub key to the
349
  permitted hosts and adds the hostkey to its own known hosts.
350

351
  Args:
352
    node: the name of this host as a fqdn
353

354
  """
355
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
356

    
357
  for name in priv_key, pub_key:
358
    if os.path.exists(name):
359
      utils.CreateBackup(name)
360
    utils.RemoveFile(name)
361

    
362
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
363
                         "-f", priv_key,
364
                         "-q", "-N", ""])
365
  if result.failed:
366
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
367
                             result.output)
368

    
369
  f = open(pub_key, 'r')
370
  try:
371
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
372
  finally:
373
    f.close()
374

    
375

    
376
def _InitGanetiServerSetup(ss):
377
  """Setup the necessary configuration for the initial node daemon.
378

379
  This creates the nodepass file containing the shared password for
380
  the cluster and also generates the SSL certificate.
381

382
  """
383
  # Create pseudo random password
384
  randpass = sha.new(os.urandom(64)).hexdigest()
385
  # and write it into sstore
386
  ss.SetKey(ss.SS_NODED_PASS, randpass)
387

    
388
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
389
                         "-days", str(365*5), "-nodes", "-x509",
390
                         "-keyout", constants.SSL_CERT_FILE,
391
                         "-out", constants.SSL_CERT_FILE, "-batch"])
392
  if result.failed:
393
    raise errors.OpExecError("could not generate server ssl cert, command"
394
                             " %s had exitcode %s and error message %s" %
395
                             (result.cmd, result.exit_code, result.output))
396

    
397
  os.chmod(constants.SSL_CERT_FILE, 0400)
398

    
399
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
400

    
401
  if result.failed:
402
    raise errors.OpExecError("Could not start the node daemon, command %s"
403
                             " had exitcode %s and error %s" %
404
                             (result.cmd, result.exit_code, result.output))
405

    
406

    
407
def _CheckInstanceBridgesExist(instance):
408
  """Check that the brigdes needed by an instance exist.
409

410
  """
411
  # check bridges existance
412
  brlist = [nic.bridge for nic in instance.nics]
413
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
414
    raise errors.OpPrereqError("one or more target bridges %s does not"
415
                               " exist on destination node '%s'" %
416
                               (brlist, instance.primary_node))
417

    
418

    
419
class LUInitCluster(LogicalUnit):
420
  """Initialise the cluster.
421

422
  """
423
  HPATH = "cluster-init"
424
  HTYPE = constants.HTYPE_CLUSTER
425
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
426
              "def_bridge", "master_netdev", "file_storage_dir"]
427
  REQ_CLUSTER = False
428

    
429
  def BuildHooksEnv(self):
430
    """Build hooks env.
431

432
    Notes: Since we don't require a cluster, we must manually add
433
    ourselves in the post-run node list.
434

435
    """
436
    env = {"OP_TARGET": self.op.cluster_name}
437
    return env, [], [self.hostname.name]
438

    
439
  def CheckPrereq(self):
440
    """Verify that the passed name is a valid one.
441

442
    """
443
    if config.ConfigWriter.IsCluster():
444
      raise errors.OpPrereqError("Cluster is already initialised")
445

    
446
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
447
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
448
        raise errors.OpPrereqError("Please prepare the cluster VNC"
449
                                   "password file %s" %
450
                                   constants.VNC_PASSWORD_FILE)
451

    
452
    self.hostname = hostname = utils.HostInfo()
453

    
454
    if hostname.ip.startswith("127."):
455
      raise errors.OpPrereqError("This host's IP resolves to the private"
456
                                 " range (%s). Please fix DNS or %s." %
457
                                 (hostname.ip, constants.ETC_HOSTS))
458

    
459
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
460
                         source=constants.LOCALHOST_IP_ADDRESS):
461
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
462
                                 " to %s,\nbut this ip address does not"
463
                                 " belong to this host."
464
                                 " Aborting." % hostname.ip)
465

    
466
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
467

    
468
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
469
                     timeout=5):
470
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
471

    
472
    secondary_ip = getattr(self.op, "secondary_ip", None)
473
    if secondary_ip and not utils.IsValidIP(secondary_ip):
474
      raise errors.OpPrereqError("Invalid secondary ip given")
475
    if (secondary_ip and
476
        secondary_ip != hostname.ip and
477
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
478
                           source=constants.LOCALHOST_IP_ADDRESS))):
479
      raise errors.OpPrereqError("You gave %s as secondary IP,"
480
                                 " but it does not belong to this host." %
481
                                 secondary_ip)
482
    self.secondary_ip = secondary_ip
483

    
484
    if not hasattr(self.op, "vg_name"):
485
      self.op.vg_name = None
486
    # if vg_name not None, checks if volume group is valid
487
    if self.op.vg_name:
488
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
489
      if vgstatus:
490
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
491
                                   " you are not using lvm" % vgstatus)
492

    
493
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
494

    
495
    if not os.path.isabs(self.op.file_storage_dir):
496
      raise errors.OpPrereqError("The file storage directory you have is"
497
                                 " not an absolute path.")
498

    
499
    if not os.path.exists(self.op.file_storage_dir):
500
      try:
501
        os.makedirs(self.op.file_storage_dir, 0750)
502
      except OSError, err:
503
        raise errors.OpPrereqError("Cannot create file storage directory"
504
                                   " '%s': %s" %
505
                                   (self.op.file_storage_dir, err))
506

    
507
    if not os.path.isdir(self.op.file_storage_dir):
508
      raise errors.OpPrereqError("The file storage directory '%s' is not"
509
                                 " a directory." % self.op.file_storage_dir)
510

    
511
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
512
                    self.op.mac_prefix):
513
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
514
                                 self.op.mac_prefix)
515

    
516
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
517
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
518
                                 self.op.hypervisor_type)
519

    
520
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
521
    if result.failed:
522
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
523
                                 (self.op.master_netdev,
524
                                  result.output.strip()))
525

    
526
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
527
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
528
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
529
                                 " executable." % constants.NODE_INITD_SCRIPT)
530

    
531
  def Exec(self, feedback_fn):
532
    """Initialize the cluster.
533

534
    """
535
    clustername = self.clustername
536
    hostname = self.hostname
537

    
538
    # set up the simple store
539
    self.sstore = ss = ssconf.SimpleStore()
540
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
541
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
542
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
543
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
544
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
545
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
546

    
547
    # set up the inter-node password and certificate
548
    _InitGanetiServerSetup(ss)
549

    
550
    # start the master ip
551
    rpc.call_node_start_master(hostname.name)
552

    
553
    # set up ssh config and /etc/hosts
554
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
555
    try:
556
      sshline = f.read()
557
    finally:
558
      f.close()
559
    sshkey = sshline.split(" ")[1]
560

    
561
    _AddHostToEtcHosts(hostname.name)
562
    _InitSSHSetup(hostname.name)
563

    
564
    # init of cluster config file
565
    self.cfg = cfgw = config.ConfigWriter()
566
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
567
                    sshkey, self.op.mac_prefix,
568
                    self.op.vg_name, self.op.def_bridge)
569

    
570
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
571

    
572

    
573
class LUDestroyCluster(NoHooksLU):
574
  """Logical unit for destroying the cluster.
575

576
  """
577
  _OP_REQP = []
578

    
579
  def CheckPrereq(self):
580
    """Check prerequisites.
581

582
    This checks whether the cluster is empty.
583

584
    Any errors are signalled by raising errors.OpPrereqError.
585

586
    """
587
    master = self.sstore.GetMasterNode()
588

    
589
    nodelist = self.cfg.GetNodeList()
590
    if len(nodelist) != 1 or nodelist[0] != master:
591
      raise errors.OpPrereqError("There are still %d node(s) in"
592
                                 " this cluster." % (len(nodelist) - 1))
593
    instancelist = self.cfg.GetInstanceList()
594
    if instancelist:
595
      raise errors.OpPrereqError("There are still %d instance(s) in"
596
                                 " this cluster." % len(instancelist))
597

    
598
  def Exec(self, feedback_fn):
599
    """Destroys the cluster.
600

601
    """
602
    master = self.sstore.GetMasterNode()
603
    if not rpc.call_node_stop_master(master):
604
      raise errors.OpExecError("Could not disable the master role")
605
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
606
    utils.CreateBackup(priv_key)
607
    utils.CreateBackup(pub_key)
608
    rpc.call_node_leave_cluster(master)
609

    
610

    
611
class LUVerifyCluster(NoHooksLU):
612
  """Verifies the cluster status.
613

614
  """
615
  _OP_REQP = ["skip_checks"]
616

    
617
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
618
                  remote_version, feedback_fn):
619
    """Run multiple tests against a node.
620

621
    Test list:
622
      - compares ganeti version
623
      - checks vg existance and size > 20G
624
      - checks config file checksum
625
      - checks ssh to other nodes
626

627
    Args:
628
      node: name of the node to check
629
      file_list: required list of files
630
      local_cksum: dictionary of local files and their checksums
631

632
    """
633
    # compares ganeti version
634
    local_version = constants.PROTOCOL_VERSION
635
    if not remote_version:
636
      feedback_fn("  - ERROR: connection to %s failed" % (node))
637
      return True
638

    
639
    if local_version != remote_version:
640
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
641
                      (local_version, node, remote_version))
642
      return True
643

    
644
    # checks vg existance and size > 20G
645

    
646
    bad = False
647
    if not vglist:
648
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
649
                      (node,))
650
      bad = True
651
    else:
652
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
653
      if vgstatus:
654
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
655
        bad = True
656

    
657
    # checks config file checksum
658
    # checks ssh to any
659

    
660
    if 'filelist' not in node_result:
661
      bad = True
662
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
663
    else:
664
      remote_cksum = node_result['filelist']
665
      for file_name in file_list:
666
        if file_name not in remote_cksum:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
669
        elif remote_cksum[file_name] != local_cksum[file_name]:
670
          bad = True
671
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
672

    
673
    if 'nodelist' not in node_result:
674
      bad = True
675
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
676
    else:
677
      if node_result['nodelist']:
678
        bad = True
679
        for node in node_result['nodelist']:
680
          feedback_fn("  - ERROR: communication with node '%s': %s" %
681
                          (node, node_result['nodelist'][node]))
682
    hyp_result = node_result.get('hypervisor', None)
683
    if hyp_result is not None:
684
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          needed_mem += instance_cfg[instance].memory
779
        if nodeinfo['mfree'] < needed_mem:
780
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
781
                      " failovers should node %s fail" % (node, prinode))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    Transform the list of checks we're going to skip into a set and check that
789
    all its members are valid.
790

791
    """
792
    self.skip_set = frozenset(self.op.skip_checks)
793
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
794
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
795

    
796
  def Exec(self, feedback_fn):
797
    """Verify integrity of cluster, performing various test on nodes.
798

799
    """
800
    bad = False
801
    feedback_fn("* Verifying global settings")
802
    for msg in self.cfg.VerifyConfig():
803
      feedback_fn("  - ERROR: %s" % msg)
804

    
805
    vg_name = self.cfg.GetVGName()
806
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
807
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
808
    i_non_redundant = [] # Non redundant instances
809
    node_volume = {}
810
    node_instance = {}
811
    node_info = {}
812
    instance_cfg = {}
813

    
814
    # FIXME: verify OS list
815
    # do local checksums
816
    file_names = list(self.sstore.GetFileList())
817
    file_names.append(constants.SSL_CERT_FILE)
818
    file_names.append(constants.CLUSTER_CONF_FILE)
819
    local_checksums = utils.FingerprintFiles(file_names)
820

    
821
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823
    all_instanceinfo = rpc.call_instance_list(nodelist)
824
    all_vglist = rpc.call_vg_list(nodelist)
825
    node_verify_param = {
826
      'filelist': file_names,
827
      'nodelist': nodelist,
828
      'hypervisor': None,
829
      }
830
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831
    all_rversion = rpc.call_version(nodelist)
832
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
833

    
834
    for node in nodelist:
835
      feedback_fn("* Verifying node %s" % node)
836
      result = self._VerifyNode(node, file_names, local_checksums,
837
                                all_vglist[node], all_nvinfo[node],
838
                                all_rversion[node], feedback_fn)
839
      bad = bad or result
840

    
841
      # node_volume
842
      volumeinfo = all_volumeinfo[node]
843

    
844
      if isinstance(volumeinfo, basestring):
845
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846
                    (node, volumeinfo[-400:].encode('string_escape')))
847
        bad = True
848
        node_volume[node] = {}
849
      elif not isinstance(volumeinfo, dict):
850
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
851
        bad = True
852
        continue
853
      else:
854
        node_volume[node] = volumeinfo
855

    
856
      # node_instance
857
      nodeinstance = all_instanceinfo[node]
858
      if type(nodeinstance) != list:
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862

    
863
      node_instance[node] = nodeinstance
864

    
865
      # node_info
866
      nodeinfo = all_ninfo[node]
867
      if not isinstance(nodeinfo, dict):
868
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
869
        bad = True
870
        continue
871

    
872
      try:
873
        node_info[node] = {
874
          "mfree": int(nodeinfo['memory_free']),
875
          "dfree": int(nodeinfo['vg_free']),
876
          "pinst": [],
877
          "sinst": [],
878
          # dictionary holding all instances this node is secondary for,
879
          # grouped by their primary node. Each key is a cluster node, and each
880
          # value is a list of instances which have the key as primary and the
881
          # current node as secondary.  this is handy to calculate N+1 memory
882
          # availability if you can only failover from a primary to its
883
          # secondary.
884
          "sinst-by-pnode": {},
885
        }
886
      except ValueError:
887
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
888
        bad = True
889
        continue
890

    
891
    node_vol_should = {}
892

    
893
    for instance in instancelist:
894
      feedback_fn("* Verifying instance %s" % instance)
895
      inst_config = self.cfg.GetInstanceInfo(instance)
896
      result =  self._VerifyInstance(instance, inst_config, node_volume,
897
                                     node_instance, feedback_fn)
898
      bad = bad or result
899

    
900
      inst_config.MapLVsByNode(node_vol_should)
901

    
902
      instance_cfg[instance] = inst_config
903

    
904
      pnode = inst_config.primary_node
905
      if pnode in node_info:
906
        node_info[pnode]['pinst'].append(instance)
907
      else:
908
        feedback_fn("  - ERROR: instance %s, connection to primary node"
909
                    " %s failed" % (instance, pnode))
910
        bad = True
911

    
912
      # If the instance is non-redundant we cannot survive losing its primary
913
      # node, so we are not N+1 compliant. On the other hand we have no disk
914
      # templates with more than one secondary so that situation is not well
915
      # supported either.
916
      # FIXME: does not support file-backed instances
917
      if len(inst_config.secondary_nodes) == 0:
918
        i_non_redundant.append(instance)
919
      elif len(inst_config.secondary_nodes) > 1:
920
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
921
                    % instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    return int(bad)
954

    
955

    
956
class LUVerifyDisks(NoHooksLU):
957
  """Verifies the cluster disks status.
958

959
  """
960
  _OP_REQP = []
961

    
962
  def CheckPrereq(self):
963
    """Check prerequisites.
964

965
    This has no prerequisites.
966

967
    """
968
    pass
969

    
970
  def Exec(self, feedback_fn):
971
    """Verify integrity of cluster disks.
972

973
    """
974
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
975

    
976
    vg_name = self.cfg.GetVGName()
977
    nodes = utils.NiceSort(self.cfg.GetNodeList())
978
    instances = [self.cfg.GetInstanceInfo(name)
979
                 for name in self.cfg.GetInstanceList()]
980

    
981
    nv_dict = {}
982
    for inst in instances:
983
      inst_lvs = {}
984
      if (inst.status != "up" or
985
          inst.disk_template not in constants.DTS_NET_MIRROR):
986
        continue
987
      inst.MapLVsByNode(inst_lvs)
988
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
989
      for node, vol_list in inst_lvs.iteritems():
990
        for vol in vol_list:
991
          nv_dict[(node, vol)] = inst
992

    
993
    if not nv_dict:
994
      return result
995

    
996
    node_lvs = rpc.call_volume_list(nodes, vg_name)
997

    
998
    to_act = set()
999
    for node in nodes:
1000
      # node_volume
1001
      lvs = node_lvs[node]
1002

    
1003
      if isinstance(lvs, basestring):
1004
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1005
        res_nlvm[node] = lvs
1006
      elif not isinstance(lvs, dict):
1007
        logger.Info("connection to node %s failed or invalid data returned" %
1008
                    (node,))
1009
        res_nodes.append(node)
1010
        continue
1011

    
1012
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1013
        inst = nv_dict.pop((node, lv_name), None)
1014
        if (not lv_online and inst is not None
1015
            and inst.name not in res_instances):
1016
          res_instances.append(inst.name)
1017

    
1018
    # any leftover items in nv_dict are missing LVs, let's arrange the
1019
    # data better
1020
    for key, inst in nv_dict.iteritems():
1021
      if inst.name not in res_missing:
1022
        res_missing[inst.name] = []
1023
      res_missing[inst.name].append(key)
1024

    
1025
    return result
1026

    
1027

    
1028
class LURenameCluster(LogicalUnit):
1029
  """Rename the cluster.
1030

1031
  """
1032
  HPATH = "cluster-rename"
1033
  HTYPE = constants.HTYPE_CLUSTER
1034
  _OP_REQP = ["name"]
1035

    
1036
  def BuildHooksEnv(self):
1037
    """Build hooks env.
1038

1039
    """
1040
    env = {
1041
      "OP_TARGET": self.sstore.GetClusterName(),
1042
      "NEW_NAME": self.op.name,
1043
      }
1044
    mn = self.sstore.GetMasterNode()
1045
    return env, [mn], [mn]
1046

    
1047
  def CheckPrereq(self):
1048
    """Verify that the passed name is a valid one.
1049

1050
    """
1051
    hostname = utils.HostInfo(self.op.name)
1052

    
1053
    new_name = hostname.name
1054
    self.ip = new_ip = hostname.ip
1055
    old_name = self.sstore.GetClusterName()
1056
    old_ip = self.sstore.GetMasterIP()
1057
    if new_name == old_name and new_ip == old_ip:
1058
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1059
                                 " cluster has changed")
1060
    if new_ip != old_ip:
1061
      result = utils.RunCmd(["fping", "-q", new_ip])
1062
      if not result.failed:
1063
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1064
                                   " reachable on the network. Aborting." %
1065
                                   new_ip)
1066

    
1067
    self.op.name = new_name
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Rename the cluster.
1071

1072
    """
1073
    clustername = self.op.name
1074
    ip = self.ip
1075
    ss = self.sstore
1076

    
1077
    # shutdown the master IP
1078
    master = ss.GetMasterNode()
1079
    if not rpc.call_node_stop_master(master):
1080
      raise errors.OpExecError("Could not disable the master role")
1081

    
1082
    try:
1083
      # modify the sstore
1084
      ss.SetKey(ss.SS_MASTER_IP, ip)
1085
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1086

    
1087
      # Distribute updated ss config to all nodes
1088
      myself = self.cfg.GetNodeInfo(master)
1089
      dist_nodes = self.cfg.GetNodeList()
1090
      if myself.name in dist_nodes:
1091
        dist_nodes.remove(myself.name)
1092

    
1093
      logger.Debug("Copying updated ssconf data to all nodes")
1094
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1095
        fname = ss.KeyToFilename(keyname)
1096
        result = rpc.call_upload_file(dist_nodes, fname)
1097
        for to_node in dist_nodes:
1098
          if not result[to_node]:
1099
            logger.Error("copy of file %s to node %s failed" %
1100
                         (fname, to_node))
1101
    finally:
1102
      if not rpc.call_node_start_master(master):
1103
        logger.Error("Could not re-enable the master role on the master,"
1104
                     " please restart manually.")
1105

    
1106

    
1107
def _RecursiveCheckIfLVMBased(disk):
1108
  """Check if the given disk or its children are lvm-based.
1109

1110
  Args:
1111
    disk: ganeti.objects.Disk object
1112

1113
  Returns:
1114
    boolean indicating whether a LD_LV dev_type was found or not
1115

1116
  """
1117
  if disk.children:
1118
    for chdisk in disk.children:
1119
      if _RecursiveCheckIfLVMBased(chdisk):
1120
        return True
1121
  return disk.dev_type == constants.LD_LV
1122

    
1123

    
1124
class LUSetClusterParams(LogicalUnit):
1125
  """Change the parameters of the cluster.
1126

1127
  """
1128
  HPATH = "cluster-modify"
1129
  HTYPE = constants.HTYPE_CLUSTER
1130
  _OP_REQP = []
1131

    
1132
  def BuildHooksEnv(self):
1133
    """Build hooks env.
1134

1135
    """
1136
    env = {
1137
      "OP_TARGET": self.sstore.GetClusterName(),
1138
      "NEW_VG_NAME": self.op.vg_name,
1139
      }
1140
    mn = self.sstore.GetMasterNode()
1141
    return env, [mn], [mn]
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    This checks whether the given params don't conflict and
1147
    if the given volume group is valid.
1148

1149
    """
1150
    if not self.op.vg_name:
1151
      instances = [self.cfg.GetInstanceInfo(name)
1152
                   for name in self.cfg.GetInstanceList()]
1153
      for inst in instances:
1154
        for disk in inst.disks:
1155
          if _RecursiveCheckIfLVMBased(disk):
1156
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1157
                                       " lvm-based instances exist")
1158

    
1159
    # if vg_name not None, checks given volume group on all nodes
1160
    if self.op.vg_name:
1161
      node_list = self.cfg.GetNodeList()
1162
      vglist = rpc.call_vg_list(node_list)
1163
      for node in node_list:
1164
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1165
        if vgstatus:
1166
          raise errors.OpPrereqError("Error on node '%s': %s" %
1167
                                     (node, vgstatus))
1168

    
1169
  def Exec(self, feedback_fn):
1170
    """Change the parameters of the cluster.
1171

1172
    """
1173
    if self.op.vg_name != self.cfg.GetVGName():
1174
      self.cfg.SetVGName(self.op.vg_name)
1175
    else:
1176
      feedback_fn("Cluster LVM configuration already in desired"
1177
                  " state, not changing")
1178

    
1179

    
1180
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1181
  """Sleep and poll for an instance's disk to sync.
1182

1183
  """
1184
  if not instance.disks:
1185
    return True
1186

    
1187
  if not oneshot:
1188
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1189

    
1190
  node = instance.primary_node
1191

    
1192
  for dev in instance.disks:
1193
    cfgw.SetDiskID(dev, node)
1194

    
1195
  retries = 0
1196
  while True:
1197
    max_time = 0
1198
    done = True
1199
    cumul_degraded = False
1200
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1201
    if not rstats:
1202
      proc.LogWarning("Can't get any data from node %s" % node)
1203
      retries += 1
1204
      if retries >= 10:
1205
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1206
                                 " aborting." % node)
1207
      time.sleep(6)
1208
      continue
1209
    retries = 0
1210
    for i in range(len(rstats)):
1211
      mstat = rstats[i]
1212
      if mstat is None:
1213
        proc.LogWarning("Can't compute data for node %s/%s" %
1214
                        (node, instance.disks[i].iv_name))
1215
        continue
1216
      # we ignore the ldisk parameter
1217
      perc_done, est_time, is_degraded, _ = mstat
1218
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1219
      if perc_done is not None:
1220
        done = False
1221
        if est_time is not None:
1222
          rem_time = "%d estimated seconds remaining" % est_time
1223
          max_time = est_time
1224
        else:
1225
          rem_time = "no time estimate"
1226
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1227
                     (instance.disks[i].iv_name, perc_done, rem_time))
1228
    if done or oneshot:
1229
      break
1230

    
1231
    if unlock:
1232
      utils.Unlock('cmd')
1233
    try:
1234
      time.sleep(min(60, max_time))
1235
    finally:
1236
      if unlock:
1237
        utils.Lock('cmd')
1238

    
1239
  if done:
1240
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1241
  return not cumul_degraded
1242

    
1243

    
1244
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1245
  """Check that mirrors are not degraded.
1246

1247
  The ldisk parameter, if True, will change the test from the
1248
  is_degraded attribute (which represents overall non-ok status for
1249
  the device(s)) to the ldisk (representing the local storage status).
1250

1251
  """
1252
  cfgw.SetDiskID(dev, node)
1253
  if ldisk:
1254
    idx = 6
1255
  else:
1256
    idx = 5
1257

    
1258
  result = True
1259
  if on_primary or dev.AssembleOnSecondary():
1260
    rstats = rpc.call_blockdev_find(node, dev)
1261
    if not rstats:
1262
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1263
      result = False
1264
    else:
1265
      result = result and (not rstats[idx])
1266
  if dev.children:
1267
    for child in dev.children:
1268
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1269

    
1270
  return result
1271

    
1272

    
1273
class LUDiagnoseOS(NoHooksLU):
1274
  """Logical unit for OS diagnose/query.
1275

1276
  """
1277
  _OP_REQP = ["output_fields", "names"]
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    This always succeeds, since this is a pure query LU.
1283

1284
    """
1285
    if self.op.names:
1286
      raise errors.OpPrereqError("Selective OS query not supported")
1287

    
1288
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1289
    _CheckOutputFields(static=[],
1290
                       dynamic=self.dynamic_fields,
1291
                       selected=self.op.output_fields)
1292

    
1293
  @staticmethod
1294
  def _DiagnoseByOS(node_list, rlist):
1295
    """Remaps a per-node return list into an a per-os per-node dictionary
1296

1297
      Args:
1298
        node_list: a list with the names of all nodes
1299
        rlist: a map with node names as keys and OS objects as values
1300

1301
      Returns:
1302
        map: a map with osnames as keys and as value another map, with
1303
             nodes as
1304
             keys and list of OS objects as values
1305
             e.g. {"debian-etch": {"node1": [<object>,...],
1306
                                   "node2": [<object>,]}
1307
                  }
1308

1309
    """
1310
    all_os = {}
1311
    for node_name, nr in rlist.iteritems():
1312
      if not nr:
1313
        continue
1314
      for os in nr:
1315
        if os.name not in all_os:
1316
          # build a list of nodes for this os containing empty lists
1317
          # for each node in node_list
1318
          all_os[os.name] = {}
1319
          for nname in node_list:
1320
            all_os[os.name][nname] = []
1321
        all_os[os.name][node_name].append(os)
1322
    return all_os
1323

    
1324
  def Exec(self, feedback_fn):
1325
    """Compute the list of OSes.
1326

1327
    """
1328
    node_list = self.cfg.GetNodeList()
1329
    node_data = rpc.call_os_diagnose(node_list)
1330
    if node_data == False:
1331
      raise errors.OpExecError("Can't gather the list of OSes")
1332
    pol = self._DiagnoseByOS(node_list, node_data)
1333
    output = []
1334
    for os_name, os_data in pol.iteritems():
1335
      row = []
1336
      for field in self.op.output_fields:
1337
        if field == "name":
1338
          val = os_name
1339
        elif field == "valid":
1340
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1341
        elif field == "node_status":
1342
          val = {}
1343
          for node_name, nos_list in os_data.iteritems():
1344
            val[node_name] = [(v.status, v.path) for v in nos_list]
1345
        else:
1346
          raise errors.ParameterError(field)
1347
        row.append(val)
1348
      output.append(row)
1349

    
1350
    return output
1351

    
1352

    
1353
class LURemoveNode(LogicalUnit):
1354
  """Logical unit for removing a node.
1355

1356
  """
1357
  HPATH = "node-remove"
1358
  HTYPE = constants.HTYPE_NODE
1359
  _OP_REQP = ["node_name"]
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    This doesn't run on the target node in the pre phase as a failed
1365
    node would not allows itself to run.
1366

1367
    """
1368
    env = {
1369
      "OP_TARGET": self.op.node_name,
1370
      "NODE_NAME": self.op.node_name,
1371
      }
1372
    all_nodes = self.cfg.GetNodeList()
1373
    all_nodes.remove(self.op.node_name)
1374
    return env, all_nodes, all_nodes
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks:
1380
     - the node exists in the configuration
1381
     - it does not have primary or secondary instances
1382
     - it's not the master
1383

1384
    Any errors are signalled by raising errors.OpPrereqError.
1385

1386
    """
1387
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1388
    if node is None:
1389
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1390

    
1391
    instance_list = self.cfg.GetInstanceList()
1392

    
1393
    masternode = self.sstore.GetMasterNode()
1394
    if node.name == masternode:
1395
      raise errors.OpPrereqError("Node is the master node,"
1396
                                 " you need to failover first.")
1397

    
1398
    for instance_name in instance_list:
1399
      instance = self.cfg.GetInstanceInfo(instance_name)
1400
      if node.name == instance.primary_node:
1401
        raise errors.OpPrereqError("Instance %s still running on the node,"
1402
                                   " please remove first." % instance_name)
1403
      if node.name in instance.secondary_nodes:
1404
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1405
                                   " please remove first." % instance_name)
1406
    self.op.node_name = node.name
1407
    self.node = node
1408

    
1409
  def Exec(self, feedback_fn):
1410
    """Removes the node from the cluster.
1411

1412
    """
1413
    node = self.node
1414
    logger.Info("stopping the node daemon and removing configs from node %s" %
1415
                node.name)
1416

    
1417
    rpc.call_node_leave_cluster(node.name)
1418

    
1419
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1420

    
1421
    logger.Info("Removing node %s from config" % node.name)
1422

    
1423
    self.cfg.RemoveNode(node.name)
1424

    
1425
    _RemoveHostFromEtcHosts(node.name)
1426

    
1427

    
1428
class LUQueryNodes(NoHooksLU):
1429
  """Logical unit for querying nodes.
1430

1431
  """
1432
  _OP_REQP = ["output_fields", "names"]
1433

    
1434
  def CheckPrereq(self):
1435
    """Check prerequisites.
1436

1437
    This checks that the fields required are valid output fields.
1438

1439
    """
1440
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1441
                                     "mtotal", "mnode", "mfree",
1442
                                     "bootid"])
1443

    
1444
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1445
                               "pinst_list", "sinst_list",
1446
                               "pip", "sip"],
1447
                       dynamic=self.dynamic_fields,
1448
                       selected=self.op.output_fields)
1449

    
1450
    self.wanted = _GetWantedNodes(self, self.op.names)
1451

    
1452
  def Exec(self, feedback_fn):
1453
    """Computes the list of nodes and their attributes.
1454

1455
    """
1456
    nodenames = self.wanted
1457
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1458

    
1459
    # begin data gathering
1460

    
1461
    if self.dynamic_fields.intersection(self.op.output_fields):
1462
      live_data = {}
1463
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1464
      for name in nodenames:
1465
        nodeinfo = node_data.get(name, None)
1466
        if nodeinfo:
1467
          live_data[name] = {
1468
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1469
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1470
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1471
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1472
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1473
            "bootid": nodeinfo['bootid'],
1474
            }
1475
        else:
1476
          live_data[name] = {}
1477
    else:
1478
      live_data = dict.fromkeys(nodenames, {})
1479

    
1480
    node_to_primary = dict([(name, set()) for name in nodenames])
1481
    node_to_secondary = dict([(name, set()) for name in nodenames])
1482

    
1483
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1484
                             "sinst_cnt", "sinst_list"))
1485
    if inst_fields & frozenset(self.op.output_fields):
1486
      instancelist = self.cfg.GetInstanceList()
1487

    
1488
      for instance_name in instancelist:
1489
        inst = self.cfg.GetInstanceInfo(instance_name)
1490
        if inst.primary_node in node_to_primary:
1491
          node_to_primary[inst.primary_node].add(inst.name)
1492
        for secnode in inst.secondary_nodes:
1493
          if secnode in node_to_secondary:
1494
            node_to_secondary[secnode].add(inst.name)
1495

    
1496
    # end data gathering
1497

    
1498
    output = []
1499
    for node in nodelist:
1500
      node_output = []
1501
      for field in self.op.output_fields:
1502
        if field == "name":
1503
          val = node.name
1504
        elif field == "pinst_list":
1505
          val = list(node_to_primary[node.name])
1506
        elif field == "sinst_list":
1507
          val = list(node_to_secondary[node.name])
1508
        elif field == "pinst_cnt":
1509
          val = len(node_to_primary[node.name])
1510
        elif field == "sinst_cnt":
1511
          val = len(node_to_secondary[node.name])
1512
        elif field == "pip":
1513
          val = node.primary_ip
1514
        elif field == "sip":
1515
          val = node.secondary_ip
1516
        elif field in self.dynamic_fields:
1517
          val = live_data[node.name].get(field, None)
1518
        else:
1519
          raise errors.ParameterError(field)
1520
        node_output.append(val)
1521
      output.append(node_output)
1522

    
1523
    return output
1524

    
1525

    
1526
class LUQueryNodeVolumes(NoHooksLU):
1527
  """Logical unit for getting volumes on node(s).
1528

1529
  """
1530
  _OP_REQP = ["nodes", "output_fields"]
1531

    
1532
  def CheckPrereq(self):
1533
    """Check prerequisites.
1534

1535
    This checks that the fields required are valid output fields.
1536

1537
    """
1538
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1539

    
1540
    _CheckOutputFields(static=["node"],
1541
                       dynamic=["phys", "vg", "name", "size", "instance"],
1542
                       selected=self.op.output_fields)
1543

    
1544

    
1545
  def Exec(self, feedback_fn):
1546
    """Computes the list of nodes and their attributes.
1547

1548
    """
1549
    nodenames = self.nodes
1550
    volumes = rpc.call_node_volumes(nodenames)
1551

    
1552
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1553
             in self.cfg.GetInstanceList()]
1554

    
1555
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1556

    
1557
    output = []
1558
    for node in nodenames:
1559
      if node not in volumes or not volumes[node]:
1560
        continue
1561

    
1562
      node_vols = volumes[node][:]
1563
      node_vols.sort(key=lambda vol: vol['dev'])
1564

    
1565
      for vol in node_vols:
1566
        node_output = []
1567
        for field in self.op.output_fields:
1568
          if field == "node":
1569
            val = node
1570
          elif field == "phys":
1571
            val = vol['dev']
1572
          elif field == "vg":
1573
            val = vol['vg']
1574
          elif field == "name":
1575
            val = vol['name']
1576
          elif field == "size":
1577
            val = int(float(vol['size']))
1578
          elif field == "instance":
1579
            for inst in ilist:
1580
              if node not in lv_by_node[inst]:
1581
                continue
1582
              if vol['name'] in lv_by_node[inst][node]:
1583
                val = inst.name
1584
                break
1585
            else:
1586
              val = '-'
1587
          else:
1588
            raise errors.ParameterError(field)
1589
          node_output.append(str(val))
1590

    
1591
        output.append(node_output)
1592

    
1593
    return output
1594

    
1595

    
1596
class LUAddNode(LogicalUnit):
1597
  """Logical unit for adding node to the cluster.
1598

1599
  """
1600
  HPATH = "node-add"
1601
  HTYPE = constants.HTYPE_NODE
1602
  _OP_REQP = ["node_name"]
1603

    
1604
  def BuildHooksEnv(self):
1605
    """Build hooks env.
1606

1607
    This will run on all nodes before, and on all nodes + the new node after.
1608

1609
    """
1610
    env = {
1611
      "OP_TARGET": self.op.node_name,
1612
      "NODE_NAME": self.op.node_name,
1613
      "NODE_PIP": self.op.primary_ip,
1614
      "NODE_SIP": self.op.secondary_ip,
1615
      }
1616
    nodes_0 = self.cfg.GetNodeList()
1617
    nodes_1 = nodes_0 + [self.op.node_name, ]
1618
    return env, nodes_0, nodes_1
1619

    
1620
  def CheckPrereq(self):
1621
    """Check prerequisites.
1622

1623
    This checks:
1624
     - the new node is not already in the config
1625
     - it is resolvable
1626
     - its parameters (single/dual homed) matches the cluster
1627

1628
    Any errors are signalled by raising errors.OpPrereqError.
1629

1630
    """
1631
    node_name = self.op.node_name
1632
    cfg = self.cfg
1633

    
1634
    dns_data = utils.HostInfo(node_name)
1635

    
1636
    node = dns_data.name
1637
    primary_ip = self.op.primary_ip = dns_data.ip
1638
    secondary_ip = getattr(self.op, "secondary_ip", None)
1639
    if secondary_ip is None:
1640
      secondary_ip = primary_ip
1641
    if not utils.IsValidIP(secondary_ip):
1642
      raise errors.OpPrereqError("Invalid secondary IP given")
1643
    self.op.secondary_ip = secondary_ip
1644
    node_list = cfg.GetNodeList()
1645
    if node in node_list:
1646
      raise errors.OpPrereqError("Node %s is already in the configuration"
1647
                                 % node)
1648

    
1649
    for existing_node_name in node_list:
1650
      existing_node = cfg.GetNodeInfo(existing_node_name)
1651
      if (existing_node.primary_ip == primary_ip or
1652
          existing_node.secondary_ip == primary_ip or
1653
          existing_node.primary_ip == secondary_ip or
1654
          existing_node.secondary_ip == secondary_ip):
1655
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1656
                                   " existing node %s" % existing_node.name)
1657

    
1658
    # check that the type of the node (single versus dual homed) is the
1659
    # same as for the master
1660
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1661
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1662
    newbie_singlehomed = secondary_ip == primary_ip
1663
    if master_singlehomed != newbie_singlehomed:
1664
      if master_singlehomed:
1665
        raise errors.OpPrereqError("The master has no private ip but the"
1666
                                   " new node has one")
1667
      else:
1668
        raise errors.OpPrereqError("The master has a private ip but the"
1669
                                   " new node doesn't have one")
1670

    
1671
    # checks reachablity
1672
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1673
      raise errors.OpPrereqError("Node not reachable by ping")
1674

    
1675
    if not newbie_singlehomed:
1676
      # check reachability from my secondary ip to newbie's secondary ip
1677
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1678
                           source=myself.secondary_ip):
1679
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1680
                                   " based ping to noded port")
1681

    
1682
    self.new_node = objects.Node(name=node,
1683
                                 primary_ip=primary_ip,
1684
                                 secondary_ip=secondary_ip)
1685

    
1686
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1687
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1688
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1689
                                   constants.VNC_PASSWORD_FILE)
1690

    
1691
  def Exec(self, feedback_fn):
1692
    """Adds the new node to the cluster.
1693

1694
    """
1695
    new_node = self.new_node
1696
    node = new_node.name
1697

    
1698
    # set up inter-node password and certificate and restarts the node daemon
1699
    gntpass = self.sstore.GetNodeDaemonPassword()
1700
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1701
      raise errors.OpExecError("ganeti password corruption detected")
1702
    f = open(constants.SSL_CERT_FILE)
1703
    try:
1704
      gntpem = f.read(8192)
1705
    finally:
1706
      f.close()
1707
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1708
    # so we use this to detect an invalid certificate; as long as the
1709
    # cert doesn't contain this, the here-document will be correctly
1710
    # parsed by the shell sequence below
1711
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1712
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1713
    if not gntpem.endswith("\n"):
1714
      raise errors.OpExecError("PEM must end with newline")
1715
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1716

    
1717
    # and then connect with ssh to set password and start ganeti-noded
1718
    # note that all the below variables are sanitized at this point,
1719
    # either by being constants or by the checks above
1720
    ss = self.sstore
1721
    mycommand = ("umask 077 && "
1722
                 "echo '%s' > '%s' && "
1723
                 "cat > '%s' << '!EOF.' && \n"
1724
                 "%s!EOF.\n%s restart" %
1725
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1726
                  constants.SSL_CERT_FILE, gntpem,
1727
                  constants.NODE_INITD_SCRIPT))
1728

    
1729
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1730
    if result.failed:
1731
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1732
                               " output: %s" %
1733
                               (node, result.fail_reason, result.output))
1734

    
1735
    # check connectivity
1736
    time.sleep(4)
1737

    
1738
    result = rpc.call_version([node])[node]
1739
    if result:
1740
      if constants.PROTOCOL_VERSION == result:
1741
        logger.Info("communication to node %s fine, sw version %s match" %
1742
                    (node, result))
1743
      else:
1744
        raise errors.OpExecError("Version mismatch master version %s,"
1745
                                 " node version %s" %
1746
                                 (constants.PROTOCOL_VERSION, result))
1747
    else:
1748
      raise errors.OpExecError("Cannot get version from the new node")
1749

    
1750
    # setup ssh on node
1751
    logger.Info("copy ssh key to node %s" % node)
1752
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1753
    keyarray = []
1754
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1755
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1756
                priv_key, pub_key]
1757

    
1758
    for i in keyfiles:
1759
      f = open(i, 'r')
1760
      try:
1761
        keyarray.append(f.read())
1762
      finally:
1763
        f.close()
1764

    
1765
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1766
                               keyarray[3], keyarray[4], keyarray[5])
1767

    
1768
    if not result:
1769
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1770

    
1771
    # Add node to our /etc/hosts, and add key to known_hosts
1772
    _AddHostToEtcHosts(new_node.name)
1773

    
1774
    if new_node.secondary_ip != new_node.primary_ip:
1775
      if not rpc.call_node_tcp_ping(new_node.name,
1776
                                    constants.LOCALHOST_IP_ADDRESS,
1777
                                    new_node.secondary_ip,
1778
                                    constants.DEFAULT_NODED_PORT,
1779
                                    10, False):
1780
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1781
                                 " you gave (%s). Please fix and re-run this"
1782
                                 " command." % new_node.secondary_ip)
1783

    
1784
    success, msg = self.ssh.VerifyNodeHostname(node)
1785
    if not success:
1786
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1787
                               " than the one the resolver gives: %s."
1788
                               " Please fix and re-run this command." %
1789
                               (node, msg))
1790

    
1791
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1792
    # including the node just added
1793
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1794
    dist_nodes = self.cfg.GetNodeList() + [node]
1795
    if myself.name in dist_nodes:
1796
      dist_nodes.remove(myself.name)
1797

    
1798
    logger.Debug("Copying hosts and known_hosts to all nodes")
1799
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1800
      result = rpc.call_upload_file(dist_nodes, fname)
1801
      for to_node in dist_nodes:
1802
        if not result[to_node]:
1803
          logger.Error("copy of file %s to node %s failed" %
1804
                       (fname, to_node))
1805

    
1806
    to_copy = ss.GetFileList()
1807
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1808
      to_copy.append(constants.VNC_PASSWORD_FILE)
1809
    for fname in to_copy:
1810
      if not self.ssh.CopyFileToNode(node, fname):
1811
        logger.Error("could not copy file %s to node %s" % (fname, node))
1812

    
1813
    logger.Info("adding node %s to cluster.conf" % node)
1814
    self.cfg.AddNode(new_node)
1815

    
1816

    
1817
class LUMasterFailover(LogicalUnit):
1818
  """Failover the master node to the current node.
1819

1820
  This is a special LU in that it must run on a non-master node.
1821

1822
  """
1823
  HPATH = "master-failover"
1824
  HTYPE = constants.HTYPE_CLUSTER
1825
  REQ_MASTER = False
1826
  _OP_REQP = []
1827

    
1828
  def BuildHooksEnv(self):
1829
    """Build hooks env.
1830

1831
    This will run on the new master only in the pre phase, and on all
1832
    the nodes in the post phase.
1833

1834
    """
1835
    env = {
1836
      "OP_TARGET": self.new_master,
1837
      "NEW_MASTER": self.new_master,
1838
      "OLD_MASTER": self.old_master,
1839
      }
1840
    return env, [self.new_master], self.cfg.GetNodeList()
1841

    
1842
  def CheckPrereq(self):
1843
    """Check prerequisites.
1844

1845
    This checks that we are not already the master.
1846

1847
    """
1848
    self.new_master = utils.HostInfo().name
1849
    self.old_master = self.sstore.GetMasterNode()
1850

    
1851
    if self.old_master == self.new_master:
1852
      raise errors.OpPrereqError("This commands must be run on the node"
1853
                                 " where you want the new master to be."
1854
                                 " %s is already the master" %
1855
                                 self.old_master)
1856

    
1857
  def Exec(self, feedback_fn):
1858
    """Failover the master node.
1859

1860
    This command, when run on a non-master node, will cause the current
1861
    master to cease being master, and the non-master to become new
1862
    master.
1863

1864
    """
1865
    #TODO: do not rely on gethostname returning the FQDN
1866
    logger.Info("setting master to %s, old master: %s" %
1867
                (self.new_master, self.old_master))
1868

    
1869
    if not rpc.call_node_stop_master(self.old_master):
1870
      logger.Error("could disable the master role on the old master"
1871
                   " %s, please disable manually" % self.old_master)
1872

    
1873
    ss = self.sstore
1874
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1875
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1876
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1877
      logger.Error("could not distribute the new simple store master file"
1878
                   " to the other nodes, please check.")
1879

    
1880
    if not rpc.call_node_start_master(self.new_master):
1881
      logger.Error("could not start the master role on the new master"
1882
                   " %s, please check" % self.new_master)
1883
      feedback_fn("Error in activating the master IP on the new master,"
1884
                  " please fix manually.")
1885

    
1886

    
1887

    
1888
class LUQueryClusterInfo(NoHooksLU):
1889
  """Query cluster configuration.
1890

1891
  """
1892
  _OP_REQP = []
1893
  REQ_MASTER = False
1894

    
1895
  def CheckPrereq(self):
1896
    """No prerequsites needed for this LU.
1897

1898
    """
1899
    pass
1900

    
1901
  def Exec(self, feedback_fn):
1902
    """Return cluster config.
1903

1904
    """
1905
    result = {
1906
      "name": self.sstore.GetClusterName(),
1907
      "software_version": constants.RELEASE_VERSION,
1908
      "protocol_version": constants.PROTOCOL_VERSION,
1909
      "config_version": constants.CONFIG_VERSION,
1910
      "os_api_version": constants.OS_API_VERSION,
1911
      "export_version": constants.EXPORT_VERSION,
1912
      "master": self.sstore.GetMasterNode(),
1913
      "architecture": (platform.architecture()[0], platform.machine()),
1914
      }
1915

    
1916
    return result
1917

    
1918

    
1919
class LUClusterCopyFile(NoHooksLU):
1920
  """Copy file to cluster.
1921

1922
  """
1923
  _OP_REQP = ["nodes", "filename"]
1924

    
1925
  def CheckPrereq(self):
1926
    """Check prerequisites.
1927

1928
    It should check that the named file exists and that the given list
1929
    of nodes is valid.
1930

1931
    """
1932
    if not os.path.exists(self.op.filename):
1933
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1934

    
1935
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1936

    
1937
  def Exec(self, feedback_fn):
1938
    """Copy a file from master to some nodes.
1939

1940
    Args:
1941
      opts - class with options as members
1942
      args - list containing a single element, the file name
1943
    Opts used:
1944
      nodes - list containing the name of target nodes; if empty, all nodes
1945

1946
    """
1947
    filename = self.op.filename
1948

    
1949
    myname = utils.HostInfo().name
1950

    
1951
    for node in self.nodes:
1952
      if node == myname:
1953
        continue
1954
      if not self.ssh.CopyFileToNode(node, filename):
1955
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1956

    
1957

    
1958
class LUDumpClusterConfig(NoHooksLU):
1959
  """Return a text-representation of the cluster-config.
1960

1961
  """
1962
  _OP_REQP = []
1963

    
1964
  def CheckPrereq(self):
1965
    """No prerequisites.
1966

1967
    """
1968
    pass
1969

    
1970
  def Exec(self, feedback_fn):
1971
    """Dump a representation of the cluster config to the standard output.
1972

1973
    """
1974
    return self.cfg.DumpConfig()
1975

    
1976

    
1977
class LURunClusterCommand(NoHooksLU):
1978
  """Run a command on some nodes.
1979

1980
  """
1981
  _OP_REQP = ["command", "nodes"]
1982

    
1983
  def CheckPrereq(self):
1984
    """Check prerequisites.
1985

1986
    It checks that the given list of nodes is valid.
1987

1988
    """
1989
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1990

    
1991
  def Exec(self, feedback_fn):
1992
    """Run a command on some nodes.
1993

1994
    """
1995
    # put the master at the end of the nodes list
1996
    master_node = self.sstore.GetMasterNode()
1997
    if master_node in self.nodes:
1998
      self.nodes.remove(master_node)
1999
      self.nodes.append(master_node)
2000

    
2001
    data = []
2002
    for node in self.nodes:
2003
      result = self.ssh.Run(node, "root", self.op.command)
2004
      data.append((node, result.output, result.exit_code))
2005

    
2006
    return data
2007

    
2008

    
2009
class LUActivateInstanceDisks(NoHooksLU):
2010
  """Bring up an instance's disks.
2011

2012
  """
2013
  _OP_REQP = ["instance_name"]
2014

    
2015
  def CheckPrereq(self):
2016
    """Check prerequisites.
2017

2018
    This checks that the instance is in the cluster.
2019

2020
    """
2021
    instance = self.cfg.GetInstanceInfo(
2022
      self.cfg.ExpandInstanceName(self.op.instance_name))
2023
    if instance is None:
2024
      raise errors.OpPrereqError("Instance '%s' not known" %
2025
                                 self.op.instance_name)
2026
    self.instance = instance
2027

    
2028

    
2029
  def Exec(self, feedback_fn):
2030
    """Activate the disks.
2031

2032
    """
2033
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2034
    if not disks_ok:
2035
      raise errors.OpExecError("Cannot activate block devices")
2036

    
2037
    return disks_info
2038

    
2039

    
2040
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2041
  """Prepare the block devices for an instance.
2042

2043
  This sets up the block devices on all nodes.
2044

2045
  Args:
2046
    instance: a ganeti.objects.Instance object
2047
    ignore_secondaries: if true, errors on secondary nodes won't result
2048
                        in an error return from the function
2049

2050
  Returns:
2051
    false if the operation failed
2052
    list of (host, instance_visible_name, node_visible_name) if the operation
2053
         suceeded with the mapping from node devices to instance devices
2054
  """
2055
  device_info = []
2056
  disks_ok = True
2057
  iname = instance.name
2058
  # With the two passes mechanism we try to reduce the window of
2059
  # opportunity for the race condition of switching DRBD to primary
2060
  # before handshaking occured, but we do not eliminate it
2061

    
2062
  # The proper fix would be to wait (with some limits) until the
2063
  # connection has been made and drbd transitions from WFConnection
2064
  # into any other network-connected state (Connected, SyncTarget,
2065
  # SyncSource, etc.)
2066

    
2067
  # 1st pass, assemble on all nodes in secondary mode
2068
  for inst_disk in instance.disks:
2069
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2070
      cfg.SetDiskID(node_disk, node)
2071
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2072
      if not result:
2073
        logger.Error("could not prepare block device %s on node %s"
2074
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2075
        if not ignore_secondaries:
2076
          disks_ok = False
2077

    
2078
  # FIXME: race condition on drbd migration to primary
2079

    
2080
  # 2nd pass, do only the primary node
2081
  for inst_disk in instance.disks:
2082
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2083
      if node != instance.primary_node:
2084
        continue
2085
      cfg.SetDiskID(node_disk, node)
2086
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2087
      if not result:
2088
        logger.Error("could not prepare block device %s on node %s"
2089
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2090
        disks_ok = False
2091
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2092

    
2093
  # leave the disks configured for the primary node
2094
  # this is a workaround that would be fixed better by
2095
  # improving the logical/physical id handling
2096
  for disk in instance.disks:
2097
    cfg.SetDiskID(disk, instance.primary_node)
2098

    
2099
  return disks_ok, device_info
2100

    
2101

    
2102
def _StartInstanceDisks(cfg, instance, force):
2103
  """Start the disks of an instance.
2104

2105
  """
2106
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2107
                                           ignore_secondaries=force)
2108
  if not disks_ok:
2109
    _ShutdownInstanceDisks(instance, cfg)
2110
    if force is not None and not force:
2111
      logger.Error("If the message above refers to a secondary node,"
2112
                   " you can retry the operation using '--force'.")
2113
    raise errors.OpExecError("Disk consistency error")
2114

    
2115

    
2116
class LUDeactivateInstanceDisks(NoHooksLU):
2117
  """Shutdown an instance's disks.
2118

2119
  """
2120
  _OP_REQP = ["instance_name"]
2121

    
2122
  def CheckPrereq(self):
2123
    """Check prerequisites.
2124

2125
    This checks that the instance is in the cluster.
2126

2127
    """
2128
    instance = self.cfg.GetInstanceInfo(
2129
      self.cfg.ExpandInstanceName(self.op.instance_name))
2130
    if instance is None:
2131
      raise errors.OpPrereqError("Instance '%s' not known" %
2132
                                 self.op.instance_name)
2133
    self.instance = instance
2134

    
2135
  def Exec(self, feedback_fn):
2136
    """Deactivate the disks
2137

2138
    """
2139
    instance = self.instance
2140
    ins_l = rpc.call_instance_list([instance.primary_node])
2141
    ins_l = ins_l[instance.primary_node]
2142
    if not type(ins_l) is list:
2143
      raise errors.OpExecError("Can't contact node '%s'" %
2144
                               instance.primary_node)
2145

    
2146
    if self.instance.name in ins_l:
2147
      raise errors.OpExecError("Instance is running, can't shutdown"
2148
                               " block devices.")
2149

    
2150
    _ShutdownInstanceDisks(instance, self.cfg)
2151

    
2152

    
2153
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2154
  """Shutdown block devices of an instance.
2155

2156
  This does the shutdown on all nodes of the instance.
2157

2158
  If the ignore_primary is false, errors on the primary node are
2159
  ignored.
2160

2161
  """
2162
  result = True
2163
  for disk in instance.disks:
2164
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2165
      cfg.SetDiskID(top_disk, node)
2166
      if not rpc.call_blockdev_shutdown(node, top_disk):
2167
        logger.Error("could not shutdown block device %s on node %s" %
2168
                     (disk.iv_name, node))
2169
        if not ignore_primary or node != instance.primary_node:
2170
          result = False
2171
  return result
2172

    
2173

    
2174
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2175
  """Checks if a node has enough free memory.
2176

2177
  This function check if a given node has the needed amount of free
2178
  memory. In case the node has less memory or we cannot get the
2179
  information from the node, this function raise an OpPrereqError
2180
  exception.
2181

2182
  Args:
2183
    - cfg: a ConfigWriter instance
2184
    - node: the node name
2185
    - reason: string to use in the error message
2186
    - requested: the amount of memory in MiB
2187

2188
  """
2189
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2190
  if not nodeinfo or not isinstance(nodeinfo, dict):
2191
    raise errors.OpPrereqError("Could not contact node %s for resource"
2192
                             " information" % (node,))
2193

    
2194
  free_mem = nodeinfo[node].get('memory_free')
2195
  if not isinstance(free_mem, int):
2196
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2197
                             " was '%s'" % (node, free_mem))
2198
  if requested > free_mem:
2199
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2200
                             " needed %s MiB, available %s MiB" %
2201
                             (node, reason, requested, free_mem))
2202

    
2203

    
2204
class LUStartupInstance(LogicalUnit):
2205
  """Starts an instance.
2206

2207
  """
2208
  HPATH = "instance-start"
2209
  HTYPE = constants.HTYPE_INSTANCE
2210
  _OP_REQP = ["instance_name", "force"]
2211

    
2212
  def BuildHooksEnv(self):
2213
    """Build hooks env.
2214

2215
    This runs on master, primary and secondary nodes of the instance.
2216

2217
    """
2218
    env = {
2219
      "FORCE": self.op.force,
2220
      }
2221
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2222
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223
          list(self.instance.secondary_nodes))
2224
    return env, nl, nl
2225

    
2226
  def CheckPrereq(self):
2227
    """Check prerequisites.
2228

2229
    This checks that the instance is in the cluster.
2230

2231
    """
2232
    instance = self.cfg.GetInstanceInfo(
2233
      self.cfg.ExpandInstanceName(self.op.instance_name))
2234
    if instance is None:
2235
      raise errors.OpPrereqError("Instance '%s' not known" %
2236
                                 self.op.instance_name)
2237

    
2238
    # check bridges existance
2239
    _CheckInstanceBridgesExist(instance)
2240

    
2241
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2242
                         "starting instance %s" % instance.name,
2243
                         instance.memory)
2244

    
2245
    self.instance = instance
2246
    self.op.instance_name = instance.name
2247

    
2248
  def Exec(self, feedback_fn):
2249
    """Start the instance.
2250

2251
    """
2252
    instance = self.instance
2253
    force = self.op.force
2254
    extra_args = getattr(self.op, "extra_args", "")
2255

    
2256
    self.cfg.MarkInstanceUp(instance.name)
2257

    
2258
    node_current = instance.primary_node
2259

    
2260
    _StartInstanceDisks(self.cfg, instance, force)
2261

    
2262
    if not rpc.call_instance_start(node_current, instance, extra_args):
2263
      _ShutdownInstanceDisks(instance, self.cfg)
2264
      raise errors.OpExecError("Could not start instance")
2265

    
2266

    
2267
class LURebootInstance(LogicalUnit):
2268
  """Reboot an instance.
2269

2270
  """
2271
  HPATH = "instance-reboot"
2272
  HTYPE = constants.HTYPE_INSTANCE
2273
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2274

    
2275
  def BuildHooksEnv(self):
2276
    """Build hooks env.
2277

2278
    This runs on master, primary and secondary nodes of the instance.
2279

2280
    """
2281
    env = {
2282
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2283
      }
2284
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2285
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2286
          list(self.instance.secondary_nodes))
2287
    return env, nl, nl
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks that the instance is in the cluster.
2293

2294
    """
2295
    instance = self.cfg.GetInstanceInfo(
2296
      self.cfg.ExpandInstanceName(self.op.instance_name))
2297
    if instance is None:
2298
      raise errors.OpPrereqError("Instance '%s' not known" %
2299
                                 self.op.instance_name)
2300

    
2301
    # check bridges existance
2302
    _CheckInstanceBridgesExist(instance)
2303

    
2304
    self.instance = instance
2305
    self.op.instance_name = instance.name
2306

    
2307
  def Exec(self, feedback_fn):
2308
    """Reboot the instance.
2309

2310
    """
2311
    instance = self.instance
2312
    ignore_secondaries = self.op.ignore_secondaries
2313
    reboot_type = self.op.reboot_type
2314
    extra_args = getattr(self.op, "extra_args", "")
2315

    
2316
    node_current = instance.primary_node
2317

    
2318
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2319
                           constants.INSTANCE_REBOOT_HARD,
2320
                           constants.INSTANCE_REBOOT_FULL]:
2321
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2322
                                  (constants.INSTANCE_REBOOT_SOFT,
2323
                                   constants.INSTANCE_REBOOT_HARD,
2324
                                   constants.INSTANCE_REBOOT_FULL))
2325

    
2326
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2327
                       constants.INSTANCE_REBOOT_HARD]:
2328
      if not rpc.call_instance_reboot(node_current, instance,
2329
                                      reboot_type, extra_args):
2330
        raise errors.OpExecError("Could not reboot instance")
2331
    else:
2332
      if not rpc.call_instance_shutdown(node_current, instance):
2333
        raise errors.OpExecError("could not shutdown instance for full reboot")
2334
      _ShutdownInstanceDisks(instance, self.cfg)
2335
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2336
      if not rpc.call_instance_start(node_current, instance, extra_args):
2337
        _ShutdownInstanceDisks(instance, self.cfg)
2338
        raise errors.OpExecError("Could not start instance for full reboot")
2339

    
2340
    self.cfg.MarkInstanceUp(instance.name)
2341

    
2342

    
2343
class LUShutdownInstance(LogicalUnit):
2344
  """Shutdown an instance.
2345

2346
  """
2347
  HPATH = "instance-stop"
2348
  HTYPE = constants.HTYPE_INSTANCE
2349
  _OP_REQP = ["instance_name"]
2350

    
2351
  def BuildHooksEnv(self):
2352
    """Build hooks env.
2353

2354
    This runs on master, primary and secondary nodes of the instance.
2355

2356
    """
2357
    env = _BuildInstanceHookEnvByObject(self.instance)
2358
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2359
          list(self.instance.secondary_nodes))
2360
    return env, nl, nl
2361

    
2362
  def CheckPrereq(self):
2363
    """Check prerequisites.
2364

2365
    This checks that the instance is in the cluster.
2366

2367
    """
2368
    instance = self.cfg.GetInstanceInfo(
2369
      self.cfg.ExpandInstanceName(self.op.instance_name))
2370
    if instance is None:
2371
      raise errors.OpPrereqError("Instance '%s' not known" %
2372
                                 self.op.instance_name)
2373
    self.instance = instance
2374

    
2375
  def Exec(self, feedback_fn):
2376
    """Shutdown the instance.
2377

2378
    """
2379
    instance = self.instance
2380
    node_current = instance.primary_node
2381
    self.cfg.MarkInstanceDown(instance.name)
2382
    if not rpc.call_instance_shutdown(node_current, instance):
2383
      logger.Error("could not shutdown instance")
2384

    
2385
    _ShutdownInstanceDisks(instance, self.cfg)
2386

    
2387

    
2388
class LUReinstallInstance(LogicalUnit):
2389
  """Reinstall an instance.
2390

2391
  """
2392
  HPATH = "instance-reinstall"
2393
  HTYPE = constants.HTYPE_INSTANCE
2394
  _OP_REQP = ["instance_name"]
2395

    
2396
  def BuildHooksEnv(self):
2397
    """Build hooks env.
2398

2399
    This runs on master, primary and secondary nodes of the instance.
2400

2401
    """
2402
    env = _BuildInstanceHookEnvByObject(self.instance)
2403
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2404
          list(self.instance.secondary_nodes))
2405
    return env, nl, nl
2406

    
2407
  def CheckPrereq(self):
2408
    """Check prerequisites.
2409

2410
    This checks that the instance is in the cluster and is not running.
2411

2412
    """
2413
    instance = self.cfg.GetInstanceInfo(
2414
      self.cfg.ExpandInstanceName(self.op.instance_name))
2415
    if instance is None:
2416
      raise errors.OpPrereqError("Instance '%s' not known" %
2417
                                 self.op.instance_name)
2418
    if instance.disk_template == constants.DT_DISKLESS:
2419
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2420
                                 self.op.instance_name)
2421
    if instance.status != "down":
2422
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2423
                                 self.op.instance_name)
2424
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2425
    if remote_info:
2426
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2427
                                 (self.op.instance_name,
2428
                                  instance.primary_node))
2429

    
2430
    self.op.os_type = getattr(self.op, "os_type", None)
2431
    if self.op.os_type is not None:
2432
      # OS verification
2433
      pnode = self.cfg.GetNodeInfo(
2434
        self.cfg.ExpandNodeName(instance.primary_node))
2435
      if pnode is None:
2436
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2437
                                   self.op.pnode)
2438
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2439
      if not os_obj:
2440
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2441
                                   " primary node"  % self.op.os_type)
2442

    
2443
    self.instance = instance
2444

    
2445
  def Exec(self, feedback_fn):
2446
    """Reinstall the instance.
2447

2448
    """
2449
    inst = self.instance
2450

    
2451
    if self.op.os_type is not None:
2452
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2453
      inst.os = self.op.os_type
2454
      self.cfg.AddInstance(inst)
2455

    
2456
    _StartInstanceDisks(self.cfg, inst, None)
2457
    try:
2458
      feedback_fn("Running the instance OS create scripts...")
2459
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2460
        raise errors.OpExecError("Could not install OS for instance %s"
2461
                                 " on node %s" %
2462
                                 (inst.name, inst.primary_node))
2463
    finally:
2464
      _ShutdownInstanceDisks(inst, self.cfg)
2465

    
2466

    
2467
class LURenameInstance(LogicalUnit):
2468
  """Rename an instance.
2469

2470
  """
2471
  HPATH = "instance-rename"
2472
  HTYPE = constants.HTYPE_INSTANCE
2473
  _OP_REQP = ["instance_name", "new_name"]
2474

    
2475
  def BuildHooksEnv(self):
2476
    """Build hooks env.
2477

2478
    This runs on master, primary and secondary nodes of the instance.
2479

2480
    """
2481
    env = _BuildInstanceHookEnvByObject(self.instance)
2482
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2483
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2484
          list(self.instance.secondary_nodes))
2485
    return env, nl, nl
2486

    
2487
  def CheckPrereq(self):
2488
    """Check prerequisites.
2489

2490
    This checks that the instance is in the cluster and is not running.
2491

2492
    """
2493
    instance = self.cfg.GetInstanceInfo(
2494
      self.cfg.ExpandInstanceName(self.op.instance_name))
2495
    if instance is None:
2496
      raise errors.OpPrereqError("Instance '%s' not known" %
2497
                                 self.op.instance_name)
2498
    if instance.status != "down":
2499
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2500
                                 self.op.instance_name)
2501
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2502
    if remote_info:
2503
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2504
                                 (self.op.instance_name,
2505
                                  instance.primary_node))
2506
    self.instance = instance
2507

    
2508
    # new name verification
2509
    name_info = utils.HostInfo(self.op.new_name)
2510

    
2511
    self.op.new_name = new_name = name_info.name
2512
    instance_list = self.cfg.GetInstanceList()
2513
    if new_name in instance_list:
2514
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2515
                                 new_name)
2516

    
2517
    if not getattr(self.op, "ignore_ip", False):
2518
      command = ["fping", "-q", name_info.ip]
2519
      result = utils.RunCmd(command)
2520
      if not result.failed:
2521
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2522
                                   (name_info.ip, new_name))
2523

    
2524

    
2525
  def Exec(self, feedback_fn):
2526
    """Reinstall the instance.
2527

2528
    """
2529
    inst = self.instance
2530
    old_name = inst.name
2531

    
2532
    if inst.disk_template == constants.DT_FILE:
2533
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2534

    
2535
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2536

    
2537
    # re-read the instance from the configuration after rename
2538
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2539

    
2540
    if inst.disk_template == constants.DT_FILE:
2541
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2542
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2543
                                                old_file_storage_dir,
2544
                                                new_file_storage_dir)
2545

    
2546
      if not result:
2547
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2548
                                 " directory '%s' to '%s' (but the instance"
2549
                                 " has been renamed in Ganeti)" % (
2550
                                 inst.primary_node, old_file_storage_dir,
2551
                                 new_file_storage_dir))
2552

    
2553
      if not result[0]:
2554
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2555
                                 " (but the instance has been renamed in"
2556
                                 " Ganeti)" % (old_file_storage_dir,
2557
                                               new_file_storage_dir))
2558

    
2559
    _StartInstanceDisks(self.cfg, inst, None)
2560
    try:
2561
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2562
                                          "sda", "sdb"):
2563
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2564
               " instance has been renamed in Ganeti)" %
2565
               (inst.name, inst.primary_node))
2566
        logger.Error(msg)
2567
    finally:
2568
      _ShutdownInstanceDisks(inst, self.cfg)
2569

    
2570

    
2571
class LURemoveInstance(LogicalUnit):
2572
  """Remove an instance.
2573

2574
  """
2575
  HPATH = "instance-remove"
2576
  HTYPE = constants.HTYPE_INSTANCE
2577
  _OP_REQP = ["instance_name"]
2578

    
2579
  def BuildHooksEnv(self):
2580
    """Build hooks env.
2581

2582
    This runs on master, primary and secondary nodes of the instance.
2583

2584
    """
2585
    env = _BuildInstanceHookEnvByObject(self.instance)
2586
    nl = [self.sstore.GetMasterNode()]
2587
    return env, nl, nl
2588

    
2589
  def CheckPrereq(self):
2590
    """Check prerequisites.
2591

2592
    This checks that the instance is in the cluster.
2593

2594
    """
2595
    instance = self.cfg.GetInstanceInfo(
2596
      self.cfg.ExpandInstanceName(self.op.instance_name))
2597
    if instance is None:
2598
      raise errors.OpPrereqError("Instance '%s' not known" %
2599
                                 self.op.instance_name)
2600
    self.instance = instance
2601

    
2602
  def Exec(self, feedback_fn):
2603
    """Remove the instance.
2604

2605
    """
2606
    instance = self.instance
2607
    logger.Info("shutting down instance %s on node %s" %
2608
                (instance.name, instance.primary_node))
2609

    
2610
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2611
      if self.op.ignore_failures:
2612
        feedback_fn("Warning: can't shutdown instance")
2613
      else:
2614
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2615
                                 (instance.name, instance.primary_node))
2616

    
2617
    logger.Info("removing block devices for instance %s" % instance.name)
2618

    
2619
    if not _RemoveDisks(instance, self.cfg):
2620
      if self.op.ignore_failures:
2621
        feedback_fn("Warning: can't remove instance's disks")
2622
      else:
2623
        raise errors.OpExecError("Can't remove instance's disks")
2624

    
2625
    logger.Info("removing instance %s out of cluster config" % instance.name)
2626

    
2627
    self.cfg.RemoveInstance(instance.name)
2628

    
2629

    
2630
class LUQueryInstances(NoHooksLU):
2631
  """Logical unit for querying instances.
2632

2633
  """
2634
  _OP_REQP = ["output_fields", "names"]
2635

    
2636
  def CheckPrereq(self):
2637
    """Check prerequisites.
2638

2639
    This checks that the fields required are valid output fields.
2640

2641
    """
2642
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2643
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2644
                               "admin_state", "admin_ram",
2645
                               "disk_template", "ip", "mac", "bridge",
2646
                               "sda_size", "sdb_size", "vcpus"],
2647
                       dynamic=self.dynamic_fields,
2648
                       selected=self.op.output_fields)
2649

    
2650
    self.wanted = _GetWantedInstances(self, self.op.names)
2651

    
2652
  def Exec(self, feedback_fn):
2653
    """Computes the list of nodes and their attributes.
2654

2655
    """
2656
    instance_names = self.wanted
2657
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2658
                     in instance_names]
2659

    
2660
    # begin data gathering
2661

    
2662
    nodes = frozenset([inst.primary_node for inst in instance_list])
2663

    
2664
    bad_nodes = []
2665
    if self.dynamic_fields.intersection(self.op.output_fields):
2666
      live_data = {}
2667
      node_data = rpc.call_all_instances_info(nodes)
2668
      for name in nodes:
2669
        result = node_data[name]
2670
        if result:
2671
          live_data.update(result)
2672
        elif result == False:
2673
          bad_nodes.append(name)
2674
        # else no instance is alive
2675
    else:
2676
      live_data = dict([(name, {}) for name in instance_names])
2677

    
2678
    # end data gathering
2679

    
2680
    output = []
2681
    for instance in instance_list:
2682
      iout = []
2683
      for field in self.op.output_fields:
2684
        if field == "name":
2685
          val = instance.name
2686
        elif field == "os":
2687
          val = instance.os
2688
        elif field == "pnode":
2689
          val = instance.primary_node
2690
        elif field == "snodes":
2691
          val = list(instance.secondary_nodes)
2692
        elif field == "admin_state":
2693
          val = (instance.status != "down")
2694
        elif field == "oper_state":
2695
          if instance.primary_node in bad_nodes:
2696
            val = None
2697
          else:
2698
            val = bool(live_data.get(instance.name))
2699
        elif field == "status":
2700
          if instance.primary_node in bad_nodes:
2701
            val = "ERROR_nodedown"
2702
          else:
2703
            running = bool(live_data.get(instance.name))
2704
            if running:
2705
              if instance.status != "down":
2706
                val = "running"
2707
              else:
2708
                val = "ERROR_up"
2709
            else:
2710
              if instance.status != "down":
2711
                val = "ERROR_down"
2712
              else:
2713
                val = "ADMIN_down"
2714
        elif field == "admin_ram":
2715
          val = instance.memory
2716
        elif field == "oper_ram":
2717
          if instance.primary_node in bad_nodes:
2718
            val = None
2719
          elif instance.name in live_data:
2720
            val = live_data[instance.name].get("memory", "?")
2721
          else:
2722
            val = "-"
2723
        elif field == "disk_template":
2724
          val = instance.disk_template
2725
        elif field == "ip":
2726
          val = instance.nics[0].ip
2727
        elif field == "bridge":
2728
          val = instance.nics[0].bridge
2729
        elif field == "mac":
2730
          val = instance.nics[0].mac
2731
        elif field == "sda_size" or field == "sdb_size":
2732
          disk = instance.FindDisk(field[:3])
2733
          if disk is None:
2734
            val = None
2735
          else:
2736
            val = disk.size
2737
        elif field == "vcpus":
2738
          val = instance.vcpus
2739
        else:
2740
          raise errors.ParameterError(field)
2741
        iout.append(val)
2742
      output.append(iout)
2743

    
2744
    return output
2745

    
2746

    
2747
class LUFailoverInstance(LogicalUnit):
2748
  """Failover an instance.
2749

2750
  """
2751
  HPATH = "instance-failover"
2752
  HTYPE = constants.HTYPE_INSTANCE
2753
  _OP_REQP = ["instance_name", "ignore_consistency"]
2754

    
2755
  def BuildHooksEnv(self):
2756
    """Build hooks env.
2757

2758
    This runs on master, primary and secondary nodes of the instance.
2759

2760
    """
2761
    env = {
2762
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2763
      }
2764
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2765
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2766
    return env, nl, nl
2767

    
2768
  def CheckPrereq(self):
2769
    """Check prerequisites.
2770

2771
    This checks that the instance is in the cluster.
2772

2773
    """
2774
    instance = self.cfg.GetInstanceInfo(
2775
      self.cfg.ExpandInstanceName(self.op.instance_name))
2776
    if instance is None:
2777
      raise errors.OpPrereqError("Instance '%s' not known" %
2778
                                 self.op.instance_name)
2779

    
2780
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2781
      raise errors.OpPrereqError("Instance's disk layout is not"
2782
                                 " network mirrored, cannot failover.")
2783

    
2784
    secondary_nodes = instance.secondary_nodes
2785
    if not secondary_nodes:
2786
      raise errors.ProgrammerError("no secondary node but using "
2787
                                   "DT_REMOTE_RAID1 template")
2788

    
2789
    target_node = secondary_nodes[0]
2790
    # check memory requirements on the secondary node
2791
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2792
                         instance.name, instance.memory)
2793

    
2794
    # check bridge existance
2795
    brlist = [nic.bridge for nic in instance.nics]
2796
    if not rpc.call_bridges_exist(target_node, brlist):
2797
      raise errors.OpPrereqError("One or more target bridges %s does not"
2798
                                 " exist on destination node '%s'" %
2799
                                 (brlist, target_node))
2800

    
2801
    self.instance = instance
2802

    
2803
  def Exec(self, feedback_fn):
2804
    """Failover an instance.
2805

2806
    The failover is done by shutting it down on its present node and
2807
    starting it on the secondary.
2808

2809
    """
2810
    instance = self.instance
2811

    
2812
    source_node = instance.primary_node
2813
    target_node = instance.secondary_nodes[0]
2814

    
2815
    feedback_fn("* checking disk consistency between source and target")
2816
    for dev in instance.disks:
2817
      # for remote_raid1, these are md over drbd
2818
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2819
        if instance.status == "up" and not self.op.ignore_consistency:
2820
          raise errors.OpExecError("Disk %s is degraded on target node,"
2821
                                   " aborting failover." % dev.iv_name)
2822

    
2823
    feedback_fn("* shutting down instance on source node")
2824
    logger.Info("Shutting down instance %s on node %s" %
2825
                (instance.name, source_node))
2826

    
2827
    if not rpc.call_instance_shutdown(source_node, instance):
2828
      if self.op.ignore_consistency:
2829
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2830
                     " anyway. Please make sure node %s is down"  %
2831
                     (instance.name, source_node, source_node))
2832
      else:
2833
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2834
                                 (instance.name, source_node))
2835

    
2836
    feedback_fn("* deactivating the instance's disks on source node")
2837
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2838
      raise errors.OpExecError("Can't shut down the instance's disks.")
2839

    
2840
    instance.primary_node = target_node
2841
    # distribute new instance config to the other nodes
2842
    self.cfg.AddInstance(instance)
2843

    
2844
    # Only start the instance if it's marked as up
2845
    if instance.status == "up":
2846
      feedback_fn("* activating the instance's disks on target node")
2847
      logger.Info("Starting instance %s on node %s" %
2848
                  (instance.name, target_node))
2849

    
2850
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2851
                                               ignore_secondaries=True)
2852
      if not disks_ok:
2853
        _ShutdownInstanceDisks(instance, self.cfg)
2854
        raise errors.OpExecError("Can't activate the instance's disks")
2855

    
2856
      feedback_fn("* starting the instance on the target node")
2857
      if not rpc.call_instance_start(target_node, instance, None):
2858
        _ShutdownInstanceDisks(instance, self.cfg)
2859
        raise errors.OpExecError("Could not start instance %s on node %s." %
2860
                                 (instance.name, target_node))
2861

    
2862

    
2863
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2864
  """Create a tree of block devices on the primary node.
2865

2866
  This always creates all devices.
2867

2868
  """
2869
  if device.children:
2870
    for child in device.children:
2871
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2872
        return False
2873

    
2874
  cfg.SetDiskID(device, node)
2875
  new_id = rpc.call_blockdev_create(node, device, device.size,
2876
                                    instance.name, True, info)
2877
  if not new_id:
2878
    return False
2879
  if device.physical_id is None:
2880
    device.physical_id = new_id
2881
  return True
2882

    
2883

    
2884
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2885
  """Create a tree of block devices on a secondary node.
2886

2887
  If this device type has to be created on secondaries, create it and
2888
  all its children.
2889

2890
  If not, just recurse to children keeping the same 'force' value.
2891

2892
  """
2893
  if device.CreateOnSecondary():
2894
    force = True
2895
  if device.children:
2896
    for child in device.children:
2897
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2898
                                        child, force, info):
2899
        return False
2900

    
2901
  if not force:
2902
    return True
2903
  cfg.SetDiskID(device, node)
2904
  new_id = rpc.call_blockdev_create(node, device, device.size,
2905
                                    instance.name, False, info)
2906
  if not new_id:
2907
    return False
2908
  if device.physical_id is None:
2909
    device.physical_id = new_id
2910
  return True
2911

    
2912

    
2913
def _GenerateUniqueNames(cfg, exts):
2914
  """Generate a suitable LV name.
2915

2916
  This will generate a logical volume name for the given instance.
2917

2918
  """
2919
  results = []
2920
  for val in exts:
2921
    new_id = cfg.GenerateUniqueID()
2922
    results.append("%s%s" % (new_id, val))
2923
  return results
2924

    
2925

    
2926
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2927
  """Generate a drbd device complete with its children.
2928

2929
  """
2930
  port = cfg.AllocatePort()
2931
  vgname = cfg.GetVGName()
2932
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2933
                          logical_id=(vgname, names[0]))
2934
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2935
                          logical_id=(vgname, names[1]))
2936
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2937
                          logical_id = (primary, secondary, port),
2938
                          children = [dev_data, dev_meta])
2939
  return drbd_dev
2940

    
2941

    
2942
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2943
  """Generate a drbd8 device complete with its children.
2944

2945
  """
2946
  port = cfg.AllocatePort()
2947
  vgname = cfg.GetVGName()
2948
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2949
                          logical_id=(vgname, names[0]))
2950
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2951
                          logical_id=(vgname, names[1]))
2952
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2953
                          logical_id = (primary, secondary, port),
2954
                          children = [dev_data, dev_meta],
2955
                          iv_name=iv_name)
2956
  return drbd_dev
2957

    
2958

    
2959
def _GenerateDiskTemplate(cfg, template_name,
2960
                          instance_name, primary_node,
2961
                          secondary_nodes, disk_sz, swap_sz,
2962
                          file_storage_dir, file_driver):
2963
  """Generate the entire disk layout for a given template type.
2964

2965
  """
2966
  #TODO: compute space requirements
2967

    
2968
  vgname = cfg.GetVGName()
2969
  if template_name == constants.DT_DISKLESS:
2970
    disks = []
2971
  elif template_name == constants.DT_PLAIN:
2972
    if len(secondary_nodes) != 0:
2973
      raise errors.ProgrammerError("Wrong template configuration")
2974

    
2975
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2976
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2977
                           logical_id=(vgname, names[0]),
2978
                           iv_name = "sda")
2979
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2980
                           logical_id=(vgname, names[1]),
2981
                           iv_name = "sdb")
2982
    disks = [sda_dev, sdb_dev]
2983
  elif template_name == constants.DT_DRBD8:
2984
    if len(secondary_nodes) != 1:
2985
      raise errors.ProgrammerError("Wrong template configuration")
2986
    remote_node = secondary_nodes[0]
2987
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2988
                                       ".sdb_data", ".sdb_meta"])
2989
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2990
                                         disk_sz, names[0:2], "sda")
2991
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2992
                                         swap_sz, names[2:4], "sdb")
2993
    disks = [drbd_sda_dev, drbd_sdb_dev]
2994
  elif template_name == constants.DT_FILE:
2995
    if len(secondary_nodes) != 0:
2996
      raise errors.ProgrammerError("Wrong template configuration")
2997

    
2998
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2999
                                iv_name="sda", logical_id=(file_driver,
3000
                                "%s/sda" % file_storage_dir))
3001
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3002
                                iv_name="sdb", logical_id=(file_driver,
3003
                                "%s/sdb" % file_storage_dir))
3004
    disks = [file_sda_dev, file_sdb_dev]
3005
  else:
3006
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3007
  return disks
3008

    
3009

    
3010
def _GetInstanceInfoText(instance):
3011
  """Compute that text that should be added to the disk's metadata.
3012

3013
  """
3014
  return "originstname+%s" % instance.name
3015

    
3016

    
3017
def _CreateDisks(cfg, instance):
3018
  """Create all disks for an instance.
3019

3020
  This abstracts away some work from AddInstance.
3021

3022
  Args:
3023
    instance: the instance object
3024

3025
  Returns:
3026
    True or False showing the success of the creation process
3027

3028
  """
3029
  info = _GetInstanceInfoText(instance)
3030

    
3031
  if instance.disk_template == constants.DT_FILE:
3032
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3033
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3034
                                              file_storage_dir)
3035

    
3036
    if not result:
3037
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3038
      return False
3039

    
3040
    if not result[0]:
3041
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3042
      return False
3043

    
3044
  for device in instance.disks:
3045
    logger.Info("creating volume %s for instance %s" %
3046
                (device.iv_name, instance.name))
3047
    #HARDCODE
3048
    for secondary_node in instance.secondary_nodes:
3049
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3050
                                        device, False, info):
3051
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3052
                     (device.iv_name, device, secondary_node))
3053
        return False
3054
    #HARDCODE
3055
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3056
                                    instance, device, info):
3057
      logger.Error("failed to create volume %s on primary!" %
3058
                   device.iv_name)
3059
      return False
3060

    
3061
  return True
3062

    
3063

    
3064
def _RemoveDisks(instance, cfg):
3065
  """Remove all disks for an instance.
3066

3067
  This abstracts away some work from `AddInstance()` and
3068
  `RemoveInstance()`. Note that in case some of the devices couldn't
3069
  be removed, the removal will continue with the other ones (compare
3070
  with `_CreateDisks()`).
3071

3072
  Args:
3073
    instance: the instance object
3074

3075
  Returns:
3076
    True or False showing the success of the removal proces
3077

3078
  """
3079
  logger.Info("removing block devices for instance %s" % instance.name)
3080

    
3081
  result = True
3082
  for device in instance.disks:
3083
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3084
      cfg.SetDiskID(disk, node)
3085
      if not rpc.call_blockdev_remove(node, disk):
3086
        logger.Error("could not remove block device %s on node %s,"
3087
                     " continuing anyway" %
3088
                     (device.iv_name, node))
3089
        result = False
3090

    
3091
  if instance.disk_template == constants.DT_FILE:
3092
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3093
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3094
                                            file_storage_dir):
3095
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3096
      result = False
3097

    
3098
  return result
3099

    
3100

    
3101
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3102
  """Compute disk size requirements in the volume group
3103

3104
  This is currently hard-coded for the two-drive layout.
3105

3106
  """
3107
  # Required free disk space as a function of disk and swap space
3108
  req_size_dict = {
3109
    constants.DT_DISKLESS: None,
3110
    constants.DT_PLAIN: disk_size + swap_size,
3111
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3112
    constants.DT_DRBD8: disk_size + swap_size + 256,
3113
    constants.DT_FILE: None,
3114
  }
3115

    
3116
  if disk_template not in req_size_dict:
3117
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3118
                                 " is unknown" %  disk_template)
3119

    
3120
  return req_size_dict[disk_template]
3121

    
3122

    
3123
class LUCreateInstance(LogicalUnit):
3124
  """Create an instance.
3125

3126
  """
3127
  HPATH = "instance-add"
3128
  HTYPE = constants.HTYPE_INSTANCE
3129
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
3130
              "disk_template", "swap_size", "mode", "start", "vcpus",
3131
              "wait_for_sync", "ip_check", "mac"]
3132

    
3133
  def BuildHooksEnv(self):
3134
    """Build hooks env.
3135

3136
    This runs on master, primary and secondary nodes of the instance.
3137

3138
    """
3139
    env = {
3140
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3141
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3142
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3143
      "INSTANCE_ADD_MODE": self.op.mode,
3144
      }
3145
    if self.op.mode == constants.INSTANCE_IMPORT:
3146
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3147
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3148
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3149

    
3150
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3151
      primary_node=self.op.pnode,
3152
      secondary_nodes=self.secondaries,
3153
      status=self.instance_status,
3154
      os_type=self.op.os_type,
3155
      memory=self.op.mem_size,
3156
      vcpus=self.op.vcpus,
3157
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3158
    ))
3159

    
3160
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3161
          self.secondaries)
3162
    return env, nl, nl
3163

    
3164

    
3165
  def CheckPrereq(self):
3166
    """Check prerequisites.
3167

3168
    """
3169
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
3170
      if not hasattr(self.op, attr):
3171
        setattr(self.op, attr, None)
3172

    
3173
    if self.op.mode not in (constants.INSTANCE_CREATE,
3174
                            constants.INSTANCE_IMPORT):
3175
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3176
                                 self.op.mode)
3177

    
3178
    if (not self.cfg.GetVGName() and
3179
        self.op.disk_template not in constants.DTS_NOT_LVM):
3180
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3181
                                 " instances")
3182

    
3183
    if self.op.mode == constants.INSTANCE_IMPORT:
3184
      src_node = getattr(self.op, "src_node", None)
3185
      src_path = getattr(self.op, "src_path", None)
3186
      if src_node is None or src_path is None:
3187
        raise errors.OpPrereqError("Importing an instance requires source"
3188
                                   " node and path options")
3189
      src_node_full = self.cfg.ExpandNodeName(src_node)
3190
      if src_node_full is None:
3191
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3192
      self.op.src_node = src_node = src_node_full
3193

    
3194
      if not os.path.isabs(src_path):
3195
        raise errors.OpPrereqError("The source path must be absolute")
3196

    
3197
      export_info = rpc.call_export_info(src_node, src_path)
3198

    
3199
      if not export_info:
3200
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3201

    
3202
      if not export_info.has_section(constants.INISECT_EXP):
3203
        raise errors.ProgrammerError("Corrupted export config")
3204

    
3205
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3206
      if (int(ei_version) != constants.EXPORT_VERSION):
3207
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3208
                                   (ei_version, constants.EXPORT_VERSION))
3209

    
3210
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3211
        raise errors.OpPrereqError("Can't import instance with more than"
3212
                                   " one data disk")
3213

    
3214
      # FIXME: are the old os-es, disk sizes, etc. useful?
3215
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3216
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3217
                                                         'disk0_dump'))
3218
      self.src_image = diskimage
3219
    else: # INSTANCE_CREATE
3220
      if getattr(self.op, "os_type", None) is None:
3221
        raise errors.OpPrereqError("No guest OS specified")
3222

    
3223
    # check primary node
3224
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3225
    if pnode is None:
3226
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3227
                                 self.op.pnode)
3228
    self.op.pnode = pnode.name
3229
    self.pnode = pnode
3230
    self.secondaries = []
3231
    # disk template and mirror node verification
3232
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3233
      raise errors.OpPrereqError("Invalid disk template name")
3234

    
3235
    if (self.op.file_driver and
3236
        not self.op.file_driver in constants.FILE_DRIVER):
3237
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3238
                                 self.op.file_driver)
3239

    
3240
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3241
        raise errors.OpPrereqError("File storage directory not a relative"
3242
                                   " path")
3243

    
3244
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3245
      if getattr(self.op, "snode", None) is None:
3246
        raise errors.OpPrereqError("The networked disk templates need"
3247
                                   " a mirror node")
3248

    
3249
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3250
      if snode_name is None:
3251
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3252
                                   self.op.snode)
3253
      elif snode_name == pnode.name:
3254
        raise errors.OpPrereqError("The secondary node cannot be"
3255
                                   " the primary node.")
3256
      self.secondaries.append(snode_name)
3257

    
3258
    req_size = _ComputeDiskSize(self.op.disk_template,
3259
                                self.op.disk_size, self.op.swap_size)
3260

    
3261
    # Check lv size requirements
3262
    if req_size is not None:
3263
      nodenames = [pnode.name] + self.secondaries
3264
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3265
      for node in nodenames:
3266
        info = nodeinfo.get(node, None)
3267
        if not info:
3268
          raise errors.OpPrereqError("Cannot get current information"
3269
                                     " from node '%s'" % nodeinfo)
3270
        vg_free = info.get('vg_free', None)
3271
        if not isinstance(vg_free, int):
3272
          raise errors.OpPrereqError("Can't compute free disk space on"
3273
                                     " node %s" % node)
3274
        if req_size > info['vg_free']:
3275
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3276
                                     " %d MB available, %d MB required" %
3277
                                     (node, info['vg_free'], req_size))
3278

    
3279
    # os verification
3280
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3281
    if not os_obj:
3282
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3283
                                 " primary node"  % self.op.os_type)
3284

    
3285
    if self.op.kernel_path == constants.VALUE_NONE:
3286
      raise errors.OpPrereqError("Can't set instance kernel to none")
3287

    
3288
    # instance verification
3289
    hostname1 = utils.HostInfo(self.op.instance_name)
3290

    
3291
    self.op.instance_name = instance_name = hostname1.name
3292
    instance_list = self.cfg.GetInstanceList()
3293
    if instance_name in instance_list:
3294
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3295
                                 instance_name)
3296

    
3297
    ip = getattr(self.op, "ip", None)
3298
    if ip is None or ip.lower() == "none":
3299
      inst_ip = None
3300
    elif ip.lower() == "auto":
3301
      inst_ip = hostname1.ip
3302
    else:
3303
      if not utils.IsValidIP(ip):
3304
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3305
                                   " like a valid IP" % ip)
3306
      inst_ip = ip
3307
    self.inst_ip = inst_ip
3308

    
3309
    if self.op.start and not self.op.ip_check:
3310
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3311
                                 " adding an instance in start mode")
3312

    
3313
    if self.op.ip_check:
3314
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3315
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3316
                                   (hostname1.ip, instance_name))
3317

    
3318
    # MAC address verification
3319
    if self.op.mac != "auto":
3320
      if not utils.IsValidMac(self.op.mac.lower()):
3321
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3322
                                   self.op.mac)
3323

    
3324
    # bridge verification
3325
    bridge = getattr(self.op, "bridge", None)
3326
    if bridge is None:
3327
      self.op.bridge = self.cfg.GetDefBridge()
3328
    else:
3329
      self.op.bridge = bridge
3330

    
3331
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3332
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3333
                                 " destination node '%s'" %
3334
                                 (self.op.bridge, pnode.name))
3335

    
3336
    # boot order verification
3337
    if self.op.hvm_boot_order is not None:
3338
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3339
        raise errors.OpPrereqError("invalid boot order specified,"
3340
                                   " must be one or more of [acdn]")
3341

    
3342
    if self.op.start:
3343
      self.instance_status = 'up'
3344
    else:
3345
      self.instance_status = 'down'
3346

    
3347
  def Exec(self, feedback_fn):
3348
    """Create and add the instance to the cluster.
3349

3350
    """
3351
    instance = self.op.instance_name
3352
    pnode_name = self.pnode.name
3353

    
3354
    if self.op.mac == "auto":
3355
      mac_address = self.cfg.GenerateMAC()
3356
    else:
3357
      mac_address = self.op.mac
3358

    
3359
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3360
    if self.inst_ip is not None:
3361
      nic.ip = self.inst_ip
3362

    
3363
    ht_kind = self.sstore.GetHypervisorType()
3364
    if ht_kind in constants.HTS_REQ_PORT:
3365
      network_port = self.cfg.AllocatePort()
3366
    else:
3367
      network_port = None
3368

    
3369
    # this is needed because os.path.join does not accept None arguments
3370
    if self.op.file_storage_dir is None:
3371
      string_file_storage_dir = ""
3372
    else:
3373
      string_file_storage_dir = self.op.file_storage_dir
3374

    
3375
    # build the full file storage dir path
3376
    file_storage_dir = os.path.normpath(os.path.join(
3377
                                        self.sstore.GetFileStorageDir(),
3378
                                        string_file_storage_dir, instance))
3379

    
3380

    
3381
    disks = _GenerateDiskTemplate(self.cfg,
3382
                                  self.op.disk_template,
3383
                                  instance, pnode_name,
3384
                                  self.secondaries, self.op.disk_size,
3385
                                  self.op.swap_size,
3386
                                  file_storage_dir,
3387
                                  self.op.file_driver)
3388

    
3389
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3390
                            primary_node=pnode_name,
3391
                            memory=self.op.mem_size,
3392
                            vcpus=self.op.vcpus,
3393
                            nics=[nic], disks=disks,
3394
                            disk_template=self.op.disk_template,
3395
                            status=self.instance_status,
3396
                            network_port=network_port,
3397
                            kernel_path=self.op.kernel_path,
3398
                            initrd_path=self.op.initrd_path,
3399
                            hvm_boot_order=self.op.hvm_boot_order,
3400
                            )
3401

    
3402
    feedback_fn("* creating instance disks...")
3403
    if not _CreateDisks(self.cfg, iobj):
3404
      _RemoveDisks(iobj, self.cfg)
3405
      raise errors.OpExecError("Device creation failed, reverting...")
3406

    
3407
    feedback_fn("adding instance %s to cluster config" % instance)
3408

    
3409
    self.cfg.AddInstance(iobj)
3410

    
3411
    if self.op.wait_for_sync:
3412
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3413
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3414
      # make sure the disks are not degraded (still sync-ing is ok)
3415
      time.sleep(15)
3416
      feedback_fn("* checking mirrors status")
3417
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3418
    else:
3419
      disk_abort = False
3420

    
3421
    if disk_abort:
3422
      _RemoveDisks(iobj, self.cfg)
3423
      self.cfg.RemoveInstance(iobj.name)
3424
      raise errors.OpExecError("There are some degraded disks for"
3425
                               " this instance")
3426

    
3427
    feedback_fn("creating os for instance %s on node %s" %
3428
                (instance, pnode_name))
3429

    
3430
    if iobj.disk_template != constants.DT_DISKLESS:
3431
      if self.op.mode == constants.INSTANCE_CREATE:
3432
        feedback_fn("* running the instance OS create scripts...")
3433
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3434
          raise errors.OpExecError("could not add os for instance %s"
3435
                                   " on node %s" %
3436
                                   (instance, pnode_name))
3437

    
3438
      elif self.op.mode == constants.INSTANCE_IMPORT:
3439
        feedback_fn("* running the instance OS import scripts...")
3440
        src_node = self.op.src_node
3441
        src_image = self.src_image
3442
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3443
                                                src_node, src_image):
3444
          raise errors.OpExecError("Could not import os for instance"
3445
                                   " %s on node %s" %
3446
                                   (instance, pnode_name))
3447
      else:
3448
        # also checked in the prereq part
3449
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3450
                                     % self.op.mode)
3451

    
3452
    if self.op.start:
3453
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3454
      feedback_fn("* starting instance...")
3455
      if not rpc.call_instance_start(pnode_name, iobj, None):
3456
        raise errors.OpExecError("Could not start instance")
3457

    
3458

    
3459
class LUConnectConsole(NoHooksLU):
3460
  """Connect to an instance's console.
3461

3462
  This is somewhat special in that it returns the command line that
3463
  you need to run on the master node in order to connect to the
3464
  console.
3465

3466
  """
3467
  _OP_REQP = ["instance_name"]
3468

    
3469
  def CheckPrereq(self):
3470
    """Check prerequisites.
3471

3472
    This checks that the instance is in the cluster.
3473

3474
    """
3475
    instance = self.cfg.GetInstanceInfo(
3476
      self.cfg.ExpandInstanceName(self.op.instance_name))
3477
    if instance is None:
3478
      raise errors.OpPrereqError("Instance '%s' not known" %
3479
                                 self.op.instance_name)
3480
    self.instance = instance
3481

    
3482
  def Exec(self, feedback_fn):
3483
    """Connect to the console of an instance
3484

3485
    """
3486
    instance = self.instance
3487
    node = instance.primary_node
3488

    
3489
    node_insts = rpc.call_instance_list([node])[node]
3490
    if node_insts is False:
3491
      raise errors.OpExecError("Can't connect to node %s." % node)
3492

    
3493
    if instance.name not in node_insts:
3494
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3495

    
3496
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3497

    
3498
    hyper = hypervisor.GetHypervisor()
3499
    console_cmd = hyper.GetShellCommandForConsole(instance)
3500

    
3501
    # build ssh cmdline
3502
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3503

    
3504

    
3505
class LUReplaceDisks(LogicalUnit):
3506
  """Replace the disks of an instance.
3507

3508
  """
3509
  HPATH = "mirrors-replace"
3510
  HTYPE = constants.HTYPE_INSTANCE
3511
  _OP_REQP = ["instance_name", "mode", "disks"]
3512

    
3513
  def BuildHooksEnv(self):
3514
    """Build hooks env.
3515

3516
    This runs on the master, the primary and all the secondaries.
3517

3518
    """
3519
    env = {
3520
      "MODE": self.op.mode,
3521
      "NEW_SECONDARY": self.op.remote_node,
3522
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3523
      }
3524
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3525
    nl = [
3526
      self.sstore.GetMasterNode(),
3527
      self.instance.primary_node,
3528
      ]
3529
    if self.op.remote_node is not None:
3530
      nl.append(self.op.remote_node)
3531
    return env, nl, nl
3532

    
3533
  def CheckPrereq(self):
3534
    """Check prerequisites.
3535

3536
    This checks that the instance is in the cluster.
3537

3538
    """
3539
    instance = self.cfg.GetInstanceInfo(
3540
      self.cfg.ExpandInstanceName(self.op.instance_name))
3541
    if instance is None:
3542
      raise errors.OpPrereqError("Instance '%s' not known" %
3543
                                 self.op.instance_name)
3544
    self.instance = instance
3545
    self.op.instance_name = instance.name
3546

    
3547
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3548
      raise errors.OpPrereqError("Instance's disk layout is not"
3549
                                 " network mirrored.")
3550

    
3551
    if len(instance.secondary_nodes) != 1:
3552
      raise errors.OpPrereqError("The instance has a strange layout,"
3553
                                 " expected one secondary but found %d" %
3554
                                 len(instance.secondary_nodes))
3555

    
3556
    self.sec_node = instance.secondary_nodes[0]
3557

    
3558
    remote_node = getattr(self.op, "remote_node", None)
3559
    if remote_node is not None:
3560
      remote_node = self.cfg.ExpandNodeName(remote_node)
3561
      if remote_node is None:
3562
        raise errors.OpPrereqError("Node '%s' not known" %
3563
                                   self.op.remote_node)
3564
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3565
    else:
3566
      self.remote_node_info = None
3567
    if remote_node == instance.primary_node:
3568
      raise errors.OpPrereqError("The specified node is the primary node of"
3569
                                 " the instance.")
3570
    elif remote_node == self.sec_node:
3571
      if self.op.mode == constants.REPLACE_DISK_SEC:
3572
        # this is for DRBD8, where we can't execute the same mode of
3573
        # replacement as for drbd7 (no different port allocated)
3574
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3575
                                   " replacement")
3576
      # the user gave the current secondary, switch to
3577
      # 'no-replace-secondary' mode for drbd7
3578
      remote_node = None
3579
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3580
        self.op.mode != constants.REPLACE_DISK_ALL):
3581
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3582
                                 " disks replacement, not individual ones")
3583
    if instance.disk_template == constants.DT_DRBD8:
3584
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3585
          remote_node is not None):
3586
        # switch to replace secondary mode
3587
        self.op.mode = constants.REPLACE_DISK_SEC
3588

    
3589
      if self.op.mode == constants.REPLACE_DISK_ALL:
3590
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3591
                                   " secondary disk replacement, not"
3592
                                   " both at once")
3593
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3594
        if remote_node is not None:
3595
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3596
                                     " the secondary while doing a primary"
3597
                                     " node disk replacement")
3598
        self.tgt_node = instance.primary_node
3599
        self.oth_node = instance.secondary_nodes[0]
3600
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3601
        self.new_node = remote_node # this can be None, in which case
3602
                                    # we don't change the secondary
3603
        self.tgt_node = instance.secondary_nodes[0]
3604
        self.oth_node = instance.primary_node
3605
      else:
3606
        raise errors.ProgrammerError("Unhandled disk replace mode")
3607

    
3608
    for name in self.op.disks:
3609
      if instance.FindDisk(name) is None:
3610
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3611
                                   (name, instance.name))
3612
    self.op.remote_node = remote_node
3613

    
3614
  def _ExecRR1(self, feedback_fn):
3615
    """Replace the disks of an instance.
3616

3617
    """
3618
    instance = self.instance
3619
    iv_names = {}
3620
    # start of work
3621
    if self.op.remote_node is None:
3622
      remote_node = self.sec_node
3623
    else:
3624
      remote_node = self.op.remote_node
3625
    cfg = self.cfg
3626
    for dev in instance.disks:
3627
      size = dev.size
3628
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3629
      names = _GenerateUniqueNames(cfg, lv_names)
3630
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3631
                                       remote_node, size, names)
3632
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3633
      logger.Info("adding new mirror component on secondary for %s" %
3634
                  dev.iv_name)
3635
      #HARDCODE
3636
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3637
                                        new_drbd, False,
3638
                                        _GetInstanceInfoText(instance)):
3639
        raise errors.OpExecError("Failed to create new component on secondary"
3640
                                 " node %s. Full abort, cleanup manually!" %
3641
                                 remote_node)
3642

    
3643
      logger.Info("adding new mirror component on primary")
3644
      #HARDCODE
3645
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3646
                                      instance, new_drbd,
3647
                                      _GetInstanceInfoText(instance)):
3648
        # remove secondary dev
3649
        cfg.SetDiskID(new_drbd, remote_node)
3650
        rpc.call_blockdev_remove(remote_node, new_drbd)
3651
        raise errors.OpExecError("Failed to create volume on primary!"
3652
                                 " Full abort, cleanup manually!!")
3653

    
3654
      # the device exists now
3655
      # call the primary node to add the mirror to md
3656
      logger.Info("adding new mirror component to md")
3657
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3658
                                           [new_drbd]):
3659
        logger.Error("Can't add mirror compoment to md!")
3660
        cfg.SetDiskID(new_drbd, remote_node)
3661
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3662
          logger.Error("Can't rollback on secondary")
3663
        cfg.SetDiskID(new_drbd, instance.primary_node)
3664
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3665
          logger.Error("Can't rollback on primary")
3666
        raise errors.OpExecError("Full abort, cleanup manually!!")
3667

    
3668
      dev.children.append(new_drbd)
3669
      cfg.AddInstance(instance)
3670

    
3671
    # this can fail as the old devices are degraded and _WaitForSync
3672
    # does a combined result over all disks, so we don't check its
3673
    # return value
3674
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3675

    
3676
    # so check manually all the devices
3677
    for name in iv_names:
3678
      dev, child, new_drbd = iv_names[name]
3679
      cfg.SetDiskID(dev, instance.primary_node)
3680
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3681
      if is_degr:
3682
        raise errors.OpExecError("MD device %s is degraded!" % name)
3683
      cfg.SetDiskID(new_drbd, instance.primary_node)
3684
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3685
      if is_degr:
3686
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3687

    
3688
    for name in iv_names:
3689
      dev, child, new_drbd = iv_names[name]
3690
      logger.Info("remove mirror %s component" % name)
3691
      cfg.SetDiskID(dev, instance.primary_node)
3692
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3693
                                              dev, [child]):
3694
        logger.Error("Can't remove child from mirror, aborting"
3695
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3696
        continue
3697

    
3698
      for node in child.logical_id[:2]:
3699
        logger.Info("remove child device on %s" % node)
3700
        cfg.SetDiskID(child, node)
3701
        if not rpc.call_blockdev_remove(node, child):
3702
          logger.Error("Warning: failed to remove device from node %s,"
3703
                       " continuing operation." % node)
3704

    
3705
      dev.children.remove(child)
3706

    
3707
      cfg.AddInstance(instance)
3708

    
3709
  def _ExecD8DiskOnly(self, feedback_fn):
3710
    """Replace a disk on the primary or secondary for dbrd8.
3711

3712
    The algorithm for replace is quite complicated:
3713
      - for each disk to be replaced:
3714
        - create new LVs on the target node with unique names
3715
        - detach old LVs from the drbd device
3716
        - rename old LVs to name_replaced.<time_t>
3717
        - rename new LVs to old LVs
3718
        - attach the new LVs (with the old names now) to the drbd device
3719
      - wait for sync across all devices
3720
      - for each modified disk:
3721
        - remove old LVs (which have the name name_replaces.<time_t>)
3722

3723
    Failures are not very well handled.
3724

3725
    """
3726
    steps_total = 6
3727
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3728
    instance = self.instance
3729
    iv_names = {}
3730
    vgname = self.cfg.GetVGName()
3731
    # start of work
3732
    cfg = self.cfg
3733
    tgt_node = self.tgt_node
3734
    oth_node = self.oth_node
3735

    
3736
    # Step: check device activation
3737
    self.proc.LogStep(1, steps_total, "check device existence")
3738
    info("checking volume groups")
3739
    my_vg = cfg.GetVGName()
3740
    results = rpc.call_vg_list([oth_node, tgt_node])
3741
    if not results:
3742
      raise errors.OpExecError("Can't list volume groups on the nodes")
3743
    for node in oth_node, tgt_node:
3744
      res = results.get(node, False)
3745
      if not res or my_vg not in res:
3746
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3747
                                 (my_vg, node))
3748
    for dev in instance.disks:
3749
      if not dev.iv_name in self.op.disks:
3750
        continue
3751
      for node in tgt_node, oth_node:
3752
        info("checking %s on %s" % (dev.iv_name, node))
3753
        cfg.SetDiskID(dev, node)
3754
        if not rpc.call_blockdev_find(node, dev):
3755
          raise errors.OpExecError("Can't find device %s on node %s" %
3756
                                   (dev.iv_name, node))
3757

    
3758
    # Step: check other node consistency
3759
    self.proc.LogStep(2, steps_total, "check peer consistency")
3760
    for dev in instance.disks:
3761
      if not dev.iv_name in self.op.disks:
3762
        continue
3763
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3764
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3765
                                   oth_node==instance.primary_node):
3766
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3767
                                 " to replace disks on this node (%s)" %
3768
                                 (oth_node, tgt_node))
3769

    
3770
    # Step: create new storage
3771
    self.proc.LogStep(3, steps_total, "allocate new storage")
3772
    for dev in instance.disks:
3773
      if not dev.iv_name in self.op.disks:
3774
        continue
3775
      size = dev.size
3776
      cfg.SetDiskID(dev, tgt_node)
3777
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3778
      names = _GenerateUniqueNames(cfg, lv_names)
3779
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3780
                             logical_id=(vgname, names[0]))
3781
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3782
                             logical_id=(vgname, names[1]))
3783
      new_lvs = [lv_data, lv_meta]
3784
      old_lvs = dev.children
3785
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3786
      info("creating new local storage on %s for %s" %
3787
           (tgt_node, dev.iv_name))
3788
      # since we *always* want to create this LV, we use the
3789
      # _Create...OnPrimary (which forces the creation), even if we
3790
      # are talking about the secondary node
3791
      for new_lv in new_lvs:
3792
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3793
                                        _GetInstanceInfoText(instance)):
3794
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3795
                                   " node '%s'" %
3796
                                   (new_lv.logical_id[1], tgt_node))
3797

    
3798
    # Step: for each lv, detach+rename*2+attach
3799
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3800
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3801
      info("detaching %s drbd from local storage" % dev.iv_name)
3802
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3803
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3804
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3805
      #dev.children = []
3806
      #cfg.Update(instance)
3807

    
3808
      # ok, we created the new LVs, so now we know we have the needed
3809
      # storage; as such, we proceed on the target node to rename
3810
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3811
      # using the assumption that logical_id == physical_id (which in
3812
      # turn is the unique_id on that node)
3813

    
3814
      # FIXME(iustin): use a better name for the replaced LVs
3815
      temp_suffix = int(time.time())
3816
      ren_fn = lambda d, suff: (d.physical_id[0],
3817
                                d.physical_id[1] + "_replaced-%s" % suff)
3818
      # build the rename list based on what LVs exist on the node
3819
      rlist = []
3820
      for to_ren in old_lvs:
3821
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3822
        if find_res is not None: # device exists
3823
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3824

    
3825
      info("renaming the old LVs on the target node")
3826
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3827
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3828
      # now we rename the new LVs to the old LVs
3829
      info("renaming the new LVs on the target node")
3830
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3831
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3832
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3833

    
3834
      for old, new in zip(old_lvs, new_lvs):
3835
        new.logical_id = old.logical_id
3836
        cfg.SetDiskID(new, tgt_node)
3837

    
3838
      for disk in old_lvs:
3839
        disk.logical_id = ren_fn(disk, temp_suffix)
3840
        cfg.SetDiskID(disk, tgt_node)
3841

    
3842
      # now that the new lvs have the old name, we can add them to the device
3843
      info("adding new mirror component on %s" % tgt_node)
3844
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3845
        for new_lv in new_lvs:
3846
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3847
            warning("Can't rollback device %s", hint="manually cleanup unused"
3848
                    " logical volumes")
3849
        raise errors.OpExecError("Can't add local storage to drbd")
3850

    
3851
      dev.children = new_lvs
3852
      cfg.Update(instance)
3853

    
3854
    # Step: wait for sync
3855

    
3856
    # this can fail as the old devices are degraded and _WaitForSync
3857
    # does a combined result over all disks, so we don't check its
3858
    # return value
3859
    self.proc.LogStep(5, steps_total, "sync devices")
3860
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3861

    
3862
    # so check manually all the devices
3863
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3864
      cfg.SetDiskID(dev, instance.primary_node)
3865
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3866
      if is_degr:
3867
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3868

    
3869
    # Step: remove old storage
3870
    self.proc.LogStep(6, steps_total, "removing old storage")
3871
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3872
      info("remove logical volumes for %s" % name)
3873
      for lv in old_lvs:
3874
        cfg.SetDiskID(lv, tgt_node)
3875
        if not rpc.call_blockdev_remove(tgt_node, lv):
3876
          warning("Can't remove old LV", hint="manually remove unused LVs")
3877
          continue
3878

    
3879
  def _ExecD8Secondary(self, feedback_fn):
3880
    """Replace the secondary node for drbd8.
3881

3882
    The algorithm for replace is quite complicated:
3883
      - for all disks of the instance:
3884
        - create new LVs on the new node with same names
3885
        - shutdown the drbd device on the old secondary
3886
        - disconnect the drbd network on the primary
3887
        - create the drbd device on the new secondary
3888
        - network attach the drbd on the primary, using an artifice:
3889
          the drbd code for Attach() will connect to the network if it
3890
          finds a device which is connected to the good local disks but
3891
          not network enabled
3892
      - wait for sync across all devices
3893
      - remove all disks from the old secondary
3894

3895
    Failures are not very well handled.
3896

3897
    """
3898
    steps_total = 6
3899
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3900
    instance = self.instance
3901
    iv_names = {}
3902
    vgname = self.cfg.GetVGName()
3903
    # start of work
3904
    cfg = self.cfg
3905
    old_node = self.tgt_node
3906
    new_node = self.new_node
3907
    pri_node = instance.primary_node
3908

    
3909
    # Step: check device activation
3910
    self.proc.LogStep(1, steps_total, "check device existence")
3911
    info("checking volume groups")
3912
    my_vg = cfg.GetVGName()
3913
    results = rpc.call_vg_list([pri_node, new_node])
3914
    if not results:
3915
      raise errors.OpExecError("Can't list volume groups on the nodes")
3916
    for node in pri_node, new_node:
3917
      res = results.get(node, False)
3918
      if not res or my_vg not in res:
3919
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3920
                                 (my_vg, node))
3921
    for dev in instance.disks:
3922
      if not dev.iv_name in self.op.disks:
3923
        continue
3924
      info("checking %s on %s" % (dev.iv_name, pri_node))
3925
      cfg.SetDiskID(dev, pri_node)
3926
      if not rpc.call_blockdev_find(pri_node, dev):
3927
        raise errors.OpExecError("Can't find device %s on node %s" %
3928
                                 (dev.iv_name, pri_node))
3929

    
3930
    # Step: check other node consistency
3931
    self.proc.LogStep(2, steps_total, "check peer consistency")
3932
    for dev in instance.disks:
3933
      if not dev.iv_name in self.op.disks:
3934
        continue
3935
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3936
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3937
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3938
                                 " unsafe to replace the secondary" %
3939
                                 pri_node)
3940

    
3941
    # Step: create new storage
3942
    self.proc.LogStep(3, steps_total, "allocate new storage")
3943
    for dev in instance.disks:
3944
      size = dev.size
3945
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3946
      # since we *always* want to create this LV, we use the
3947
      # _Create...OnPrimary (which forces the creation), even if we
3948
      # are talking about the secondary node
3949
      for new_lv in dev.children:
3950
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3951
                                        _GetInstanceInfoText(instance)):
3952
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3953
                                   " node '%s'" %
3954
                                   (new_lv.logical_id[1], new_node))
3955

    
3956
      iv_names[dev.iv_name] = (dev, dev.children)
3957

    
3958
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3959
    for dev in instance.disks:
3960
      size = dev.size
3961
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3962
      # create new devices on new_node
3963
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3964
                              logical_id=(pri_node, new_node,
3965
                                          dev.logical_id[2]),
3966
                              children=dev.children)
3967
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3968
                                        new_drbd, False,
3969
                                      _GetInstanceInfoText(instance)):
3970
        raise errors.OpExecError("Failed to create new DRBD on"
3971
                                 " node '%s'" % new_node)
3972

    
3973
    for dev in instance.disks:
3974
      # we have new devices, shutdown the drbd on the old secondary
3975
      info("shutting down drbd for %s on old node" % dev.iv_name)
3976
      cfg.SetDiskID(dev, old_node)
3977
      if not rpc.call_blockdev_shutdown(old_node, dev):
3978
        warn