Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1fce5219

History | View | Annotate | Download (174.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
157
    """Notify the LU about the results of its hooks.
158

159
    This method is called every time a hooks phase is executed, and notifies
160
    the Logical Unit about the hooks' result. The LU can then use it to alter
161
    its result based on the hooks.  By default the method does nothing and the
162
    previous result is passed back unchanged but any LU can define it if it
163
    wants to use the local cluster hook-scripts somehow.
164

165
    Args:
166
      phase: the hooks phase that has just been run
167
      hooks_results: the results of the multi-node hooks rpc call
168
      feedback_fn: function to send feedback back to the caller
169
      lu_result: the previous result this LU had, or None in the PRE phase.
170

171
    """
172
    return lu_result
173

    
174

    
175
class NoHooksLU(LogicalUnit):
176
  """Simple LU which runs no hooks.
177

178
  This LU is intended as a parent for other LogicalUnits which will
179
  run no hooks, in order to reduce duplicate code.
180

181
  """
182
  HPATH = None
183
  HTYPE = None
184

    
185

    
186
def _AddHostToEtcHosts(hostname):
187
  """Wrapper around utils.SetEtcHostsEntry.
188

189
  """
190
  hi = utils.HostInfo(name=hostname)
191
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
192

    
193

    
194
def _RemoveHostFromEtcHosts(hostname):
195
  """Wrapper around utils.RemoveEtcHostsEntry.
196

197
  """
198
  hi = utils.HostInfo(name=hostname)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
200
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
201

    
202

    
203
def _GetWantedNodes(lu, nodes):
204
  """Returns list of checked and expanded node names.
205

206
  Args:
207
    nodes: List of nodes (strings) or None for all
208

209
  """
210
  if not isinstance(nodes, list):
211
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
212

    
213
  if nodes:
214
    wanted = []
215

    
216
    for name in nodes:
217
      node = lu.cfg.ExpandNodeName(name)
218
      if node is None:
219
        raise errors.OpPrereqError("No such node name '%s'" % name)
220
      wanted.append(node)
221

    
222
  else:
223
    wanted = lu.cfg.GetNodeList()
224
  return utils.NiceSort(wanted)
225

    
226

    
227
def _GetWantedInstances(lu, instances):
228
  """Returns list of checked and expanded instance names.
229

230
  Args:
231
    instances: List of instances (strings) or None for all
232

233
  """
234
  if not isinstance(instances, list):
235
    raise errors.OpPrereqError("Invalid argument type 'instances'")
236

    
237
  if instances:
238
    wanted = []
239

    
240
    for name in instances:
241
      instance = lu.cfg.ExpandInstanceName(name)
242
      if instance is None:
243
        raise errors.OpPrereqError("No such instance name '%s'" % name)
244
      wanted.append(instance)
245

    
246
  else:
247
    wanted = lu.cfg.GetInstanceList()
248
  return utils.NiceSort(wanted)
249

    
250

    
251
def _CheckOutputFields(static, dynamic, selected):
252
  """Checks whether all selected fields are valid.
253

254
  Args:
255
    static: Static fields
256
    dynamic: Dynamic fields
257

258
  """
259
  static_fields = frozenset(static)
260
  dynamic_fields = frozenset(dynamic)
261

    
262
  all_fields = static_fields | dynamic_fields
263

    
264
  if not all_fields.issuperset(selected):
265
    raise errors.OpPrereqError("Unknown output fields selected: %s"
266
                               % ",".join(frozenset(selected).
267
                                          difference(all_fields)))
268

    
269

    
270
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
271
                          memory, vcpus, nics):
272
  """Builds instance related env variables for hooks from single variables.
273

274
  Args:
275
    secondary_nodes: List of secondary nodes as strings
276
  """
277
  env = {
278
    "OP_TARGET": name,
279
    "INSTANCE_NAME": name,
280
    "INSTANCE_PRIMARY": primary_node,
281
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
282
    "INSTANCE_OS_TYPE": os_type,
283
    "INSTANCE_STATUS": status,
284
    "INSTANCE_MEMORY": memory,
285
    "INSTANCE_VCPUS": vcpus,
286
  }
287

    
288
  if nics:
289
    nic_count = len(nics)
290
    for idx, (ip, bridge, mac) in enumerate(nics):
291
      if ip is None:
292
        ip = ""
293
      env["INSTANCE_NIC%d_IP" % idx] = ip
294
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
295
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
296
  else:
297
    nic_count = 0
298

    
299
  env["INSTANCE_NIC_COUNT"] = nic_count
300

    
301
  return env
302

    
303

    
304
def _BuildInstanceHookEnvByObject(instance, override=None):
305
  """Builds instance related env variables for hooks from an object.
306

307
  Args:
308
    instance: objects.Instance object of instance
309
    override: dict of values to override
310
  """
311
  args = {
312
    'name': instance.name,
313
    'primary_node': instance.primary_node,
314
    'secondary_nodes': instance.secondary_nodes,
315
    'os_type': instance.os,
316
    'status': instance.os,
317
    'memory': instance.memory,
318
    'vcpus': instance.vcpus,
319
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
320
  }
321
  if override:
322
    args.update(override)
323
  return _BuildInstanceHookEnv(**args)
324

    
325

    
326
def _HasValidVG(vglist, vgname):
327
  """Checks if the volume group list is valid.
328

329
  A non-None return value means there's an error, and the return value
330
  is the error message.
331

332
  """
333
  vgsize = vglist.get(vgname, None)
334
  if vgsize is None:
335
    return "volume group '%s' missing" % vgname
336
  elif vgsize < 20480:
337
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
338
            (vgname, vgsize))
339
  return None
340

    
341

    
342
def _InitSSHSetup(node):
343
  """Setup the SSH configuration for the cluster.
344

345

346
  This generates a dsa keypair for root, adds the pub key to the
347
  permitted hosts and adds the hostkey to its own known hosts.
348

349
  Args:
350
    node: the name of this host as a fqdn
351

352
  """
353
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
354

    
355
  for name in priv_key, pub_key:
356
    if os.path.exists(name):
357
      utils.CreateBackup(name)
358
    utils.RemoveFile(name)
359

    
360
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
361
                         "-f", priv_key,
362
                         "-q", "-N", ""])
363
  if result.failed:
364
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
365
                             result.output)
366

    
367
  f = open(pub_key, 'r')
368
  try:
369
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
370
  finally:
371
    f.close()
372

    
373

    
374
def _InitGanetiServerSetup(ss):
375
  """Setup the necessary configuration for the initial node daemon.
376

377
  This creates the nodepass file containing the shared password for
378
  the cluster and also generates the SSL certificate.
379

380
  """
381
  # Create pseudo random password
382
  randpass = sha.new(os.urandom(64)).hexdigest()
383
  # and write it into sstore
384
  ss.SetKey(ss.SS_NODED_PASS, randpass)
385

    
386
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
387
                         "-days", str(365*5), "-nodes", "-x509",
388
                         "-keyout", constants.SSL_CERT_FILE,
389
                         "-out", constants.SSL_CERT_FILE, "-batch"])
390
  if result.failed:
391
    raise errors.OpExecError("could not generate server ssl cert, command"
392
                             " %s had exitcode %s and error message %s" %
393
                             (result.cmd, result.exit_code, result.output))
394

    
395
  os.chmod(constants.SSL_CERT_FILE, 0400)
396

    
397
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
398

    
399
  if result.failed:
400
    raise errors.OpExecError("Could not start the node daemon, command %s"
401
                             " had exitcode %s and error %s" %
402
                             (result.cmd, result.exit_code, result.output))
403

    
404

    
405
def _CheckInstanceBridgesExist(instance):
406
  """Check that the brigdes needed by an instance exist.
407

408
  """
409
  # check bridges existance
410
  brlist = [nic.bridge for nic in instance.nics]
411
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
412
    raise errors.OpPrereqError("one or more target bridges %s does not"
413
                               " exist on destination node '%s'" %
414
                               (brlist, instance.primary_node))
415

    
416

    
417
class LUInitCluster(LogicalUnit):
418
  """Initialise the cluster.
419

420
  """
421
  HPATH = "cluster-init"
422
  HTYPE = constants.HTYPE_CLUSTER
423
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
424
              "def_bridge", "master_netdev", "file_storage_dir"]
425
  REQ_CLUSTER = False
426

    
427
  def BuildHooksEnv(self):
428
    """Build hooks env.
429

430
    Notes: Since we don't require a cluster, we must manually add
431
    ourselves in the post-run node list.
432

433
    """
434
    env = {"OP_TARGET": self.op.cluster_name}
435
    return env, [], [self.hostname.name]
436

    
437
  def CheckPrereq(self):
438
    """Verify that the passed name is a valid one.
439

440
    """
441
    if config.ConfigWriter.IsCluster():
442
      raise errors.OpPrereqError("Cluster is already initialised")
443

    
444
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
445
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
446
        raise errors.OpPrereqError("Please prepare the cluster VNC"
447
                                   "password file %s" %
448
                                   constants.VNC_PASSWORD_FILE)
449

    
450
    self.hostname = hostname = utils.HostInfo()
451

    
452
    if hostname.ip.startswith("127."):
453
      raise errors.OpPrereqError("This host's IP resolves to the private"
454
                                 " range (%s). Please fix DNS or %s." %
455
                                 (hostname.ip, constants.ETC_HOSTS))
456

    
457
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
458
                         source=constants.LOCALHOST_IP_ADDRESS):
459
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
460
                                 " to %s,\nbut this ip address does not"
461
                                 " belong to this host."
462
                                 " Aborting." % hostname.ip)
463

    
464
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
465

    
466
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
467
                     timeout=5):
468
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
469

    
470
    secondary_ip = getattr(self.op, "secondary_ip", None)
471
    if secondary_ip and not utils.IsValidIP(secondary_ip):
472
      raise errors.OpPrereqError("Invalid secondary ip given")
473
    if (secondary_ip and
474
        secondary_ip != hostname.ip and
475
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
476
                           source=constants.LOCALHOST_IP_ADDRESS))):
477
      raise errors.OpPrereqError("You gave %s as secondary IP,"
478
                                 " but it does not belong to this host." %
479
                                 secondary_ip)
480
    self.secondary_ip = secondary_ip
481

    
482
    if not hasattr(self.op, "vg_name"):
483
      self.op.vg_name = None
484
    # if vg_name not None, checks if volume group is valid
485
    if self.op.vg_name:
486
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
487
      if vgstatus:
488
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
489
                                   " you are not using lvm" % vgstatus)
490

    
491
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
492

    
493
    if not os.path.isabs(self.op.file_storage_dir):
494
      raise errors.OpPrereqError("The file storage directory you have is"
495
                                 " not an absolute path.")
496

    
497
    if not os.path.exists(self.op.file_storage_dir):
498
      try:
499
        os.makedirs(self.op.file_storage_dir, 0750)
500
      except OSError, err:
501
        raise errors.OpPrereqError("Cannot create file storage directory"
502
                                   " '%s': %s" %
503
                                   (self.op.file_storage_dir, err))
504

    
505
    if not os.path.isdir(self.op.file_storage_dir):
506
      raise errors.OpPrereqError("The file storage directory '%s' is not"
507
                                 " a directory." % self.op.file_storage_dir)
508

    
509
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
510
                    self.op.mac_prefix):
511
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
512
                                 self.op.mac_prefix)
513

    
514
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
515
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
516
                                 self.op.hypervisor_type)
517

    
518
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
519
    if result.failed:
520
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
521
                                 (self.op.master_netdev,
522
                                  result.output.strip()))
523

    
524
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
525
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
526
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
527
                                 " executable." % constants.NODE_INITD_SCRIPT)
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    self.sstore = ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
540
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
543
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
544

    
545
    # set up the inter-node password and certificate
546
    _InitGanetiServerSetup(ss)
547

    
548
    # start the master ip
549
    rpc.call_node_start_master(hostname.name)
550

    
551
    # set up ssh config and /etc/hosts
552
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
553
    try:
554
      sshline = f.read()
555
    finally:
556
      f.close()
557
    sshkey = sshline.split(" ")[1]
558

    
559
    _AddHostToEtcHosts(hostname.name)
560
    _InitSSHSetup(hostname.name)
561

    
562
    # init of cluster config file
563
    self.cfg = cfgw = config.ConfigWriter()
564
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
565
                    sshkey, self.op.mac_prefix,
566
                    self.op.vg_name, self.op.def_bridge)
567

    
568
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
569

    
570

    
571
class LUDestroyCluster(NoHooksLU):
572
  """Logical unit for destroying the cluster.
573

574
  """
575
  _OP_REQP = []
576

    
577
  def CheckPrereq(self):
578
    """Check prerequisites.
579

580
    This checks whether the cluster is empty.
581

582
    Any errors are signalled by raising errors.OpPrereqError.
583

584
    """
585
    master = self.sstore.GetMasterNode()
586

    
587
    nodelist = self.cfg.GetNodeList()
588
    if len(nodelist) != 1 or nodelist[0] != master:
589
      raise errors.OpPrereqError("There are still %d node(s) in"
590
                                 " this cluster." % (len(nodelist) - 1))
591
    instancelist = self.cfg.GetInstanceList()
592
    if instancelist:
593
      raise errors.OpPrereqError("There are still %d instance(s) in"
594
                                 " this cluster." % len(instancelist))
595

    
596
  def Exec(self, feedback_fn):
597
    """Destroys the cluster.
598

599
    """
600
    master = self.sstore.GetMasterNode()
601
    if not rpc.call_node_stop_master(master):
602
      raise errors.OpExecError("Could not disable the master role")
603
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
604
    utils.CreateBackup(priv_key)
605
    utils.CreateBackup(pub_key)
606
    rpc.call_node_leave_cluster(master)
607

    
608

    
609
class LUVerifyCluster(NoHooksLU):
610
  """Verifies the cluster status.
611

612
  """
613
  _OP_REQP = ["skip_checks"]
614

    
615
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
616
                  remote_version, feedback_fn):
617
    """Run multiple tests against a node.
618

619
    Test list:
620
      - compares ganeti version
621
      - checks vg existance and size > 20G
622
      - checks config file checksum
623
      - checks ssh to other nodes
624

625
    Args:
626
      node: name of the node to check
627
      file_list: required list of files
628
      local_cksum: dictionary of local files and their checksums
629

630
    """
631
    # compares ganeti version
632
    local_version = constants.PROTOCOL_VERSION
633
    if not remote_version:
634
      feedback_fn("  - ERROR: connection to %s failed" % (node))
635
      return True
636

    
637
    if local_version != remote_version:
638
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
639
                      (local_version, node, remote_version))
640
      return True
641

    
642
    # checks vg existance and size > 20G
643

    
644
    bad = False
645
    if not vglist:
646
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
647
                      (node,))
648
      bad = True
649
    else:
650
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
651
      if vgstatus:
652
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
653
        bad = True
654

    
655
    # checks config file checksum
656
    # checks ssh to any
657

    
658
    if 'filelist' not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
661
    else:
662
      remote_cksum = node_result['filelist']
663
      for file_name in file_list:
664
        if file_name not in remote_cksum:
665
          bad = True
666
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
667
        elif remote_cksum[file_name] != local_cksum[file_name]:
668
          bad = True
669
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
670

    
671
    if 'nodelist' not in node_result:
672
      bad = True
673
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
674
    else:
675
      if node_result['nodelist']:
676
        bad = True
677
        for node in node_result['nodelist']:
678
          feedback_fn("  - ERROR: communication with node '%s': %s" %
679
                          (node, node_result['nodelist'][node]))
680
    hyp_result = node_result.get('hypervisor', None)
681
    if hyp_result is not None:
682
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
683
    return bad
684

    
685
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
686
                      node_instance, feedback_fn):
687
    """Verify an instance.
688

689
    This function checks to see if the required block devices are
690
    available on the instance's node.
691

692
    """
693
    bad = False
694

    
695
    node_current = instanceconfig.primary_node
696

    
697
    node_vol_should = {}
698
    instanceconfig.MapLVsByNode(node_vol_should)
699

    
700
    for node in node_vol_should:
701
      for volume in node_vol_should[node]:
702
        if node not in node_vol_is or volume not in node_vol_is[node]:
703
          feedback_fn("  - ERROR: volume %s missing on node %s" %
704
                          (volume, node))
705
          bad = True
706

    
707
    if not instanceconfig.status == 'down':
708
      if (node_current not in node_instance or
709
          not instance in node_instance[node_current]):
710
        feedback_fn("  - ERROR: instance %s not running on node %s" %
711
                        (instance, node_current))
712
        bad = True
713

    
714
    for node in node_instance:
715
      if (not node == node_current):
716
        if instance in node_instance[node]:
717
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
718
                          (instance, node))
719
          bad = True
720

    
721
    return bad
722

    
723
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
724
    """Verify if there are any unknown volumes in the cluster.
725

726
    The .os, .swap and backup volumes are ignored. All other volumes are
727
    reported as unknown.
728

729
    """
730
    bad = False
731

    
732
    for node in node_vol_is:
733
      for volume in node_vol_is[node]:
734
        if node not in node_vol_should or volume not in node_vol_should[node]:
735
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
736
                      (volume, node))
737
          bad = True
738
    return bad
739

    
740
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
741
    """Verify the list of running instances.
742

743
    This checks what instances are running but unknown to the cluster.
744

745
    """
746
    bad = False
747
    for node in node_instance:
748
      for runninginstance in node_instance[node]:
749
        if runninginstance not in instancelist:
750
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
751
                          (runninginstance, node))
752
          bad = True
753
    return bad
754

    
755
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
756
    """Verify N+1 Memory Resilience.
757

758
    Check that if one single node dies we can still start all the instances it
759
    was primary for.
760

761
    """
762
    bad = False
763

    
764
    for node, nodeinfo in node_info.iteritems():
765
      # This code checks that every node which is now listed as secondary has
766
      # enough memory to host all instances it is supposed to should a single
767
      # other node in the cluster fail.
768
      # FIXME: not ready for failover to an arbitrary node
769
      # FIXME: does not support file-backed instances
770
      # WARNING: we currently take into account down instances as well as up
771
      # ones, considering that even if they're down someone might want to start
772
      # them even in the event of a node failure.
773
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
774
        needed_mem = 0
775
        for instance in instances:
776
          needed_mem += instance_cfg[instance].memory
777
        if nodeinfo['mfree'] < needed_mem:
778
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
779
                      " failovers should node %s fail" % (node, prinode))
780
          bad = True
781
    return bad
782

    
783
  def CheckPrereq(self):
784
    """Check prerequisites.
785

786
    Transform the list of checks we're going to skip into a set and check that
787
    all its members are valid.
788

789
    """
790
    self.skip_set = frozenset(self.op.skip_checks)
791
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
792
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
793

    
794
  def Exec(self, feedback_fn):
795
    """Verify integrity of cluster, performing various test on nodes.
796

797
    """
798
    bad = False
799
    feedback_fn("* Verifying global settings")
800
    for msg in self.cfg.VerifyConfig():
801
      feedback_fn("  - ERROR: %s" % msg)
802

    
803
    vg_name = self.cfg.GetVGName()
804
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
805
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
806
    i_non_redundant = [] # Non redundant instances
807
    node_volume = {}
808
    node_instance = {}
809
    node_info = {}
810
    instance_cfg = {}
811

    
812
    # FIXME: verify OS list
813
    # do local checksums
814
    file_names = list(self.sstore.GetFileList())
815
    file_names.append(constants.SSL_CERT_FILE)
816
    file_names.append(constants.CLUSTER_CONF_FILE)
817
    local_checksums = utils.FingerprintFiles(file_names)
818

    
819
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
820
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
821
    all_instanceinfo = rpc.call_instance_list(nodelist)
822
    all_vglist = rpc.call_vg_list(nodelist)
823
    node_verify_param = {
824
      'filelist': file_names,
825
      'nodelist': nodelist,
826
      'hypervisor': None,
827
      }
828
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
829
    all_rversion = rpc.call_version(nodelist)
830
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
831

    
832
    for node in nodelist:
833
      feedback_fn("* Verifying node %s" % node)
834
      result = self._VerifyNode(node, file_names, local_checksums,
835
                                all_vglist[node], all_nvinfo[node],
836
                                all_rversion[node], feedback_fn)
837
      bad = bad or result
838

    
839
      # node_volume
840
      volumeinfo = all_volumeinfo[node]
841

    
842
      if isinstance(volumeinfo, basestring):
843
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
844
                    (node, volumeinfo[-400:].encode('string_escape')))
845
        bad = True
846
        node_volume[node] = {}
847
      elif not isinstance(volumeinfo, dict):
848
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
849
        bad = True
850
        continue
851
      else:
852
        node_volume[node] = volumeinfo
853

    
854
      # node_instance
855
      nodeinstance = all_instanceinfo[node]
856
      if type(nodeinstance) != list:
857
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
858
        bad = True
859
        continue
860

    
861
      node_instance[node] = nodeinstance
862

    
863
      # node_info
864
      nodeinfo = all_ninfo[node]
865
      if not isinstance(nodeinfo, dict):
866
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
867
        bad = True
868
        continue
869

    
870
      try:
871
        node_info[node] = {
872
          "mfree": int(nodeinfo['memory_free']),
873
          "dfree": int(nodeinfo['vg_free']),
874
          "pinst": [],
875
          "sinst": [],
876
          # dictionary holding all instances this node is secondary for,
877
          # grouped by their primary node. Each key is a cluster node, and each
878
          # value is a list of instances which have the key as primary and the
879
          # current node as secondary.  this is handy to calculate N+1 memory
880
          # availability if you can only failover from a primary to its
881
          # secondary.
882
          "sinst-by-pnode": {},
883
        }
884
      except ValueError:
885
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
886
        bad = True
887
        continue
888

    
889
    node_vol_should = {}
890

    
891
    for instance in instancelist:
892
      feedback_fn("* Verifying instance %s" % instance)
893
      inst_config = self.cfg.GetInstanceInfo(instance)
894
      result =  self._VerifyInstance(instance, inst_config, node_volume,
895
                                     node_instance, feedback_fn)
896
      bad = bad or result
897

    
898
      inst_config.MapLVsByNode(node_vol_should)
899

    
900
      instance_cfg[instance] = inst_config
901

    
902
      pnode = inst_config.primary_node
903
      if pnode in node_info:
904
        node_info[pnode]['pinst'].append(instance)
905
      else:
906
        feedback_fn("  - ERROR: instance %s, connection to primary node"
907
                    " %s failed" % (instance, pnode))
908
        bad = True
909

    
910
      # If the instance is non-redundant we cannot survive losing its primary
911
      # node, so we are not N+1 compliant. On the other hand we have no disk
912
      # templates with more than one secondary so that situation is not well
913
      # supported either.
914
      # FIXME: does not support file-backed instances
915
      if len(inst_config.secondary_nodes) == 0:
916
        i_non_redundant.append(instance)
917
      elif len(inst_config.secondary_nodes) > 1:
918
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
919
                    % instance)
920

    
921
      for snode in inst_config.secondary_nodes:
922
        if snode in node_info:
923
          node_info[snode]['sinst'].append(instance)
924
          if pnode not in node_info[snode]['sinst-by-pnode']:
925
            node_info[snode]['sinst-by-pnode'][pnode] = []
926
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
927
        else:
928
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
929
                      " %s failed" % (instance, snode))
930

    
931
    feedback_fn("* Verifying orphan volumes")
932
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
933
                                       feedback_fn)
934
    bad = bad or result
935

    
936
    feedback_fn("* Verifying remaining instances")
937
    result = self._VerifyOrphanInstances(instancelist, node_instance,
938
                                         feedback_fn)
939
    bad = bad or result
940

    
941
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
942
      feedback_fn("* Verifying N+1 Memory redundancy")
943
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
944
      bad = bad or result
945

    
946
    feedback_fn("* Other Notes")
947
    if i_non_redundant:
948
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
949
                  % len(i_non_redundant))
950

    
951
    return int(bad)
952

    
953

    
954
class LUVerifyDisks(NoHooksLU):
955
  """Verifies the cluster disks status.
956

957
  """
958
  _OP_REQP = []
959

    
960
  def CheckPrereq(self):
961
    """Check prerequisites.
962

963
    This has no prerequisites.
964

965
    """
966
    pass
967

    
968
  def Exec(self, feedback_fn):
969
    """Verify integrity of cluster disks.
970

971
    """
972
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
973

    
974
    vg_name = self.cfg.GetVGName()
975
    nodes = utils.NiceSort(self.cfg.GetNodeList())
976
    instances = [self.cfg.GetInstanceInfo(name)
977
                 for name in self.cfg.GetInstanceList()]
978

    
979
    nv_dict = {}
980
    for inst in instances:
981
      inst_lvs = {}
982
      if (inst.status != "up" or
983
          inst.disk_template not in constants.DTS_NET_MIRROR):
984
        continue
985
      inst.MapLVsByNode(inst_lvs)
986
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
987
      for node, vol_list in inst_lvs.iteritems():
988
        for vol in vol_list:
989
          nv_dict[(node, vol)] = inst
990

    
991
    if not nv_dict:
992
      return result
993

    
994
    node_lvs = rpc.call_volume_list(nodes, vg_name)
995

    
996
    to_act = set()
997
    for node in nodes:
998
      # node_volume
999
      lvs = node_lvs[node]
1000

    
1001
      if isinstance(lvs, basestring):
1002
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1003
        res_nlvm[node] = lvs
1004
      elif not isinstance(lvs, dict):
1005
        logger.Info("connection to node %s failed or invalid data returned" %
1006
                    (node,))
1007
        res_nodes.append(node)
1008
        continue
1009

    
1010
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1011
        inst = nv_dict.pop((node, lv_name), None)
1012
        if (not lv_online and inst is not None
1013
            and inst.name not in res_instances):
1014
          res_instances.append(inst.name)
1015

    
1016
    # any leftover items in nv_dict are missing LVs, let's arrange the
1017
    # data better
1018
    for key, inst in nv_dict.iteritems():
1019
      if inst.name not in res_missing:
1020
        res_missing[inst.name] = []
1021
      res_missing[inst.name].append(key)
1022

    
1023
    return result
1024

    
1025

    
1026
class LURenameCluster(LogicalUnit):
1027
  """Rename the cluster.
1028

1029
  """
1030
  HPATH = "cluster-rename"
1031
  HTYPE = constants.HTYPE_CLUSTER
1032
  _OP_REQP = ["name"]
1033

    
1034
  def BuildHooksEnv(self):
1035
    """Build hooks env.
1036

1037
    """
1038
    env = {
1039
      "OP_TARGET": self.sstore.GetClusterName(),
1040
      "NEW_NAME": self.op.name,
1041
      }
1042
    mn = self.sstore.GetMasterNode()
1043
    return env, [mn], [mn]
1044

    
1045
  def CheckPrereq(self):
1046
    """Verify that the passed name is a valid one.
1047

1048
    """
1049
    hostname = utils.HostInfo(self.op.name)
1050

    
1051
    new_name = hostname.name
1052
    self.ip = new_ip = hostname.ip
1053
    old_name = self.sstore.GetClusterName()
1054
    old_ip = self.sstore.GetMasterIP()
1055
    if new_name == old_name and new_ip == old_ip:
1056
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1057
                                 " cluster has changed")
1058
    if new_ip != old_ip:
1059
      result = utils.RunCmd(["fping", "-q", new_ip])
1060
      if not result.failed:
1061
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1062
                                   " reachable on the network. Aborting." %
1063
                                   new_ip)
1064

    
1065
    self.op.name = new_name
1066

    
1067
  def Exec(self, feedback_fn):
1068
    """Rename the cluster.
1069

1070
    """
1071
    clustername = self.op.name
1072
    ip = self.ip
1073
    ss = self.sstore
1074

    
1075
    # shutdown the master IP
1076
    master = ss.GetMasterNode()
1077
    if not rpc.call_node_stop_master(master):
1078
      raise errors.OpExecError("Could not disable the master role")
1079

    
1080
    try:
1081
      # modify the sstore
1082
      ss.SetKey(ss.SS_MASTER_IP, ip)
1083
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1084

    
1085
      # Distribute updated ss config to all nodes
1086
      myself = self.cfg.GetNodeInfo(master)
1087
      dist_nodes = self.cfg.GetNodeList()
1088
      if myself.name in dist_nodes:
1089
        dist_nodes.remove(myself.name)
1090

    
1091
      logger.Debug("Copying updated ssconf data to all nodes")
1092
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1093
        fname = ss.KeyToFilename(keyname)
1094
        result = rpc.call_upload_file(dist_nodes, fname)
1095
        for to_node in dist_nodes:
1096
          if not result[to_node]:
1097
            logger.Error("copy of file %s to node %s failed" %
1098
                         (fname, to_node))
1099
    finally:
1100
      if not rpc.call_node_start_master(master):
1101
        logger.Error("Could not re-enable the master role on the master,"
1102
                     " please restart manually.")
1103

    
1104

    
1105
def _RecursiveCheckIfLVMBased(disk):
1106
  """Check if the given disk or its children are lvm-based.
1107

1108
  Args:
1109
    disk: ganeti.objects.Disk object
1110

1111
  Returns:
1112
    boolean indicating whether a LD_LV dev_type was found or not
1113

1114
  """
1115
  if disk.children:
1116
    for chdisk in disk.children:
1117
      if _RecursiveCheckIfLVMBased(chdisk):
1118
        return True
1119
  return disk.dev_type == constants.LD_LV
1120

    
1121

    
1122
class LUSetClusterParams(LogicalUnit):
1123
  """Change the parameters of the cluster.
1124

1125
  """
1126
  HPATH = "cluster-modify"
1127
  HTYPE = constants.HTYPE_CLUSTER
1128
  _OP_REQP = []
1129

    
1130
  def BuildHooksEnv(self):
1131
    """Build hooks env.
1132

1133
    """
1134
    env = {
1135
      "OP_TARGET": self.sstore.GetClusterName(),
1136
      "NEW_VG_NAME": self.op.vg_name,
1137
      }
1138
    mn = self.sstore.GetMasterNode()
1139
    return env, [mn], [mn]
1140

    
1141
  def CheckPrereq(self):
1142
    """Check prerequisites.
1143

1144
    This checks whether the given params don't conflict and
1145
    if the given volume group is valid.
1146

1147
    """
1148
    if not self.op.vg_name:
1149
      instances = [self.cfg.GetInstanceInfo(name)
1150
                   for name in self.cfg.GetInstanceList()]
1151
      for inst in instances:
1152
        for disk in inst.disks:
1153
          if _RecursiveCheckIfLVMBased(disk):
1154
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1155
                                       " lvm-based instances exist")
1156

    
1157
    # if vg_name not None, checks given volume group on all nodes
1158
    if self.op.vg_name:
1159
      node_list = self.cfg.GetNodeList()
1160
      vglist = rpc.call_vg_list(node_list)
1161
      for node in node_list:
1162
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1163
        if vgstatus:
1164
          raise errors.OpPrereqError("Error on node '%s': %s" %
1165
                                     (node, vgstatus))
1166

    
1167
  def Exec(self, feedback_fn):
1168
    """Change the parameters of the cluster.
1169

1170
    """
1171
    if self.op.vg_name != self.cfg.GetVGName():
1172
      self.cfg.SetVGName(self.op.vg_name)
1173
    else:
1174
      feedback_fn("Cluster LVM configuration already in desired"
1175
                  " state, not changing")
1176

    
1177

    
1178
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1179
  """Sleep and poll for an instance's disk to sync.
1180

1181
  """
1182
  if not instance.disks:
1183
    return True
1184

    
1185
  if not oneshot:
1186
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1187

    
1188
  node = instance.primary_node
1189

    
1190
  for dev in instance.disks:
1191
    cfgw.SetDiskID(dev, node)
1192

    
1193
  retries = 0
1194
  while True:
1195
    max_time = 0
1196
    done = True
1197
    cumul_degraded = False
1198
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1199
    if not rstats:
1200
      proc.LogWarning("Can't get any data from node %s" % node)
1201
      retries += 1
1202
      if retries >= 10:
1203
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1204
                                 " aborting." % node)
1205
      time.sleep(6)
1206
      continue
1207
    retries = 0
1208
    for i in range(len(rstats)):
1209
      mstat = rstats[i]
1210
      if mstat is None:
1211
        proc.LogWarning("Can't compute data for node %s/%s" %
1212
                        (node, instance.disks[i].iv_name))
1213
        continue
1214
      # we ignore the ldisk parameter
1215
      perc_done, est_time, is_degraded, _ = mstat
1216
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1217
      if perc_done is not None:
1218
        done = False
1219
        if est_time is not None:
1220
          rem_time = "%d estimated seconds remaining" % est_time
1221
          max_time = est_time
1222
        else:
1223
          rem_time = "no time estimate"
1224
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1225
                     (instance.disks[i].iv_name, perc_done, rem_time))
1226
    if done or oneshot:
1227
      break
1228

    
1229
    if unlock:
1230
      #utils.Unlock('cmd')
1231
      pass
1232
    try:
1233
      time.sleep(min(60, max_time))
1234
    finally:
1235
      if unlock:
1236
        #utils.Lock('cmd')
1237
        pass
1238

    
1239
  if done:
1240
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1241
  return not cumul_degraded
1242

    
1243

    
1244
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1245
  """Check that mirrors are not degraded.
1246

1247
  The ldisk parameter, if True, will change the test from the
1248
  is_degraded attribute (which represents overall non-ok status for
1249
  the device(s)) to the ldisk (representing the local storage status).
1250

1251
  """
1252
  cfgw.SetDiskID(dev, node)
1253
  if ldisk:
1254
    idx = 6
1255
  else:
1256
    idx = 5
1257

    
1258
  result = True
1259
  if on_primary or dev.AssembleOnSecondary():
1260
    rstats = rpc.call_blockdev_find(node, dev)
1261
    if not rstats:
1262
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1263
      result = False
1264
    else:
1265
      result = result and (not rstats[idx])
1266
  if dev.children:
1267
    for child in dev.children:
1268
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1269

    
1270
  return result
1271

    
1272

    
1273
class LUDiagnoseOS(NoHooksLU):
1274
  """Logical unit for OS diagnose/query.
1275

1276
  """
1277
  _OP_REQP = ["output_fields", "names"]
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    This always succeeds, since this is a pure query LU.
1283

1284
    """
1285
    if self.op.names:
1286
      raise errors.OpPrereqError("Selective OS query not supported")
1287

    
1288
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1289
    _CheckOutputFields(static=[],
1290
                       dynamic=self.dynamic_fields,
1291
                       selected=self.op.output_fields)
1292

    
1293
  @staticmethod
1294
  def _DiagnoseByOS(node_list, rlist):
1295
    """Remaps a per-node return list into an a per-os per-node dictionary
1296

1297
      Args:
1298
        node_list: a list with the names of all nodes
1299
        rlist: a map with node names as keys and OS objects as values
1300

1301
      Returns:
1302
        map: a map with osnames as keys and as value another map, with
1303
             nodes as
1304
             keys and list of OS objects as values
1305
             e.g. {"debian-etch": {"node1": [<object>,...],
1306
                                   "node2": [<object>,]}
1307
                  }
1308

1309
    """
1310
    all_os = {}
1311
    for node_name, nr in rlist.iteritems():
1312
      if not nr:
1313
        continue
1314
      for os_obj in nr:
1315
        if os_obj.name not in all_os:
1316
          # build a list of nodes for this os containing empty lists
1317
          # for each node in node_list
1318
          all_os[os_obj.name] = {}
1319
          for nname in node_list:
1320
            all_os[os_obj.name][nname] = []
1321
        all_os[os_obj.name][node_name].append(os_obj)
1322
    return all_os
1323

    
1324
  def Exec(self, feedback_fn):
1325
    """Compute the list of OSes.
1326

1327
    """
1328
    node_list = self.cfg.GetNodeList()
1329
    node_data = rpc.call_os_diagnose(node_list)
1330
    if node_data == False:
1331
      raise errors.OpExecError("Can't gather the list of OSes")
1332
    pol = self._DiagnoseByOS(node_list, node_data)
1333
    output = []
1334
    for os_name, os_data in pol.iteritems():
1335
      row = []
1336
      for field in self.op.output_fields:
1337
        if field == "name":
1338
          val = os_name
1339
        elif field == "valid":
1340
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1341
        elif field == "node_status":
1342
          val = {}
1343
          for node_name, nos_list in os_data.iteritems():
1344
            val[node_name] = [(v.status, v.path) for v in nos_list]
1345
        else:
1346
          raise errors.ParameterError(field)
1347
        row.append(val)
1348
      output.append(row)
1349

    
1350
    return output
1351

    
1352

    
1353
class LURemoveNode(LogicalUnit):
1354
  """Logical unit for removing a node.
1355

1356
  """
1357
  HPATH = "node-remove"
1358
  HTYPE = constants.HTYPE_NODE
1359
  _OP_REQP = ["node_name"]
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    This doesn't run on the target node in the pre phase as a failed
1365
    node would not allows itself to run.
1366

1367
    """
1368
    env = {
1369
      "OP_TARGET": self.op.node_name,
1370
      "NODE_NAME": self.op.node_name,
1371
      }
1372
    all_nodes = self.cfg.GetNodeList()
1373
    all_nodes.remove(self.op.node_name)
1374
    return env, all_nodes, all_nodes
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks:
1380
     - the node exists in the configuration
1381
     - it does not have primary or secondary instances
1382
     - it's not the master
1383

1384
    Any errors are signalled by raising errors.OpPrereqError.
1385

1386
    """
1387
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1388
    if node is None:
1389
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1390

    
1391
    instance_list = self.cfg.GetInstanceList()
1392

    
1393
    masternode = self.sstore.GetMasterNode()
1394
    if node.name == masternode:
1395
      raise errors.OpPrereqError("Node is the master node,"
1396
                                 " you need to failover first.")
1397

    
1398
    for instance_name in instance_list:
1399
      instance = self.cfg.GetInstanceInfo(instance_name)
1400
      if node.name == instance.primary_node:
1401
        raise errors.OpPrereqError("Instance %s still running on the node,"
1402
                                   " please remove first." % instance_name)
1403
      if node.name in instance.secondary_nodes:
1404
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1405
                                   " please remove first." % instance_name)
1406
    self.op.node_name = node.name
1407
    self.node = node
1408

    
1409
  def Exec(self, feedback_fn):
1410
    """Removes the node from the cluster.
1411

1412
    """
1413
    node = self.node
1414
    logger.Info("stopping the node daemon and removing configs from node %s" %
1415
                node.name)
1416

    
1417
    rpc.call_node_leave_cluster(node.name)
1418

    
1419
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1420

    
1421
    logger.Info("Removing node %s from config" % node.name)
1422

    
1423
    self.cfg.RemoveNode(node.name)
1424

    
1425
    _RemoveHostFromEtcHosts(node.name)
1426

    
1427

    
1428
class LUQueryNodes(NoHooksLU):
1429
  """Logical unit for querying nodes.
1430

1431
  """
1432
  _OP_REQP = ["output_fields", "names"]
1433

    
1434
  def CheckPrereq(self):
1435
    """Check prerequisites.
1436

1437
    This checks that the fields required are valid output fields.
1438

1439
    """
1440
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1441
                                     "mtotal", "mnode", "mfree",
1442
                                     "bootid"])
1443

    
1444
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1445
                               "pinst_list", "sinst_list",
1446
                               "pip", "sip"],
1447
                       dynamic=self.dynamic_fields,
1448
                       selected=self.op.output_fields)
1449

    
1450
    self.wanted = _GetWantedNodes(self, self.op.names)
1451

    
1452
  def Exec(self, feedback_fn):
1453
    """Computes the list of nodes and their attributes.
1454

1455
    """
1456
    nodenames = self.wanted
1457
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1458

    
1459
    # begin data gathering
1460

    
1461
    if self.dynamic_fields.intersection(self.op.output_fields):
1462
      live_data = {}
1463
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1464
      for name in nodenames:
1465
        nodeinfo = node_data.get(name, None)
1466
        if nodeinfo:
1467
          live_data[name] = {
1468
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1469
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1470
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1471
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1472
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1473
            "bootid": nodeinfo['bootid'],
1474
            }
1475
        else:
1476
          live_data[name] = {}
1477
    else:
1478
      live_data = dict.fromkeys(nodenames, {})
1479

    
1480
    node_to_primary = dict([(name, set()) for name in nodenames])
1481
    node_to_secondary = dict([(name, set()) for name in nodenames])
1482

    
1483
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1484
                             "sinst_cnt", "sinst_list"))
1485
    if inst_fields & frozenset(self.op.output_fields):
1486
      instancelist = self.cfg.GetInstanceList()
1487

    
1488
      for instance_name in instancelist:
1489
        inst = self.cfg.GetInstanceInfo(instance_name)
1490
        if inst.primary_node in node_to_primary:
1491
          node_to_primary[inst.primary_node].add(inst.name)
1492
        for secnode in inst.secondary_nodes:
1493
          if secnode in node_to_secondary:
1494
            node_to_secondary[secnode].add(inst.name)
1495

    
1496
    # end data gathering
1497

    
1498
    output = []
1499
    for node in nodelist:
1500
      node_output = []
1501
      for field in self.op.output_fields:
1502
        if field == "name":
1503
          val = node.name
1504
        elif field == "pinst_list":
1505
          val = list(node_to_primary[node.name])
1506
        elif field == "sinst_list":
1507
          val = list(node_to_secondary[node.name])
1508
        elif field == "pinst_cnt":
1509
          val = len(node_to_primary[node.name])
1510
        elif field == "sinst_cnt":
1511
          val = len(node_to_secondary[node.name])
1512
        elif field == "pip":
1513
          val = node.primary_ip
1514
        elif field == "sip":
1515
          val = node.secondary_ip
1516
        elif field in self.dynamic_fields:
1517
          val = live_data[node.name].get(field, None)
1518
        else:
1519
          raise errors.ParameterError(field)
1520
        node_output.append(val)
1521
      output.append(node_output)
1522

    
1523
    return output
1524

    
1525

    
1526
class LUQueryNodeVolumes(NoHooksLU):
1527
  """Logical unit for getting volumes on node(s).
1528

1529
  """
1530
  _OP_REQP = ["nodes", "output_fields"]
1531

    
1532
  def CheckPrereq(self):
1533
    """Check prerequisites.
1534

1535
    This checks that the fields required are valid output fields.
1536

1537
    """
1538
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1539

    
1540
    _CheckOutputFields(static=["node"],
1541
                       dynamic=["phys", "vg", "name", "size", "instance"],
1542
                       selected=self.op.output_fields)
1543

    
1544

    
1545
  def Exec(self, feedback_fn):
1546
    """Computes the list of nodes and their attributes.
1547

1548
    """
1549
    nodenames = self.nodes
1550
    volumes = rpc.call_node_volumes(nodenames)
1551

    
1552
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1553
             in self.cfg.GetInstanceList()]
1554

    
1555
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1556

    
1557
    output = []
1558
    for node in nodenames:
1559
      if node not in volumes or not volumes[node]:
1560
        continue
1561

    
1562
      node_vols = volumes[node][:]
1563
      node_vols.sort(key=lambda vol: vol['dev'])
1564

    
1565
      for vol in node_vols:
1566
        node_output = []
1567
        for field in self.op.output_fields:
1568
          if field == "node":
1569
            val = node
1570
          elif field == "phys":
1571
            val = vol['dev']
1572
          elif field == "vg":
1573
            val = vol['vg']
1574
          elif field == "name":
1575
            val = vol['name']
1576
          elif field == "size":
1577
            val = int(float(vol['size']))
1578
          elif field == "instance":
1579
            for inst in ilist:
1580
              if node not in lv_by_node[inst]:
1581
                continue
1582
              if vol['name'] in lv_by_node[inst][node]:
1583
                val = inst.name
1584
                break
1585
            else:
1586
              val = '-'
1587
          else:
1588
            raise errors.ParameterError(field)
1589
          node_output.append(str(val))
1590

    
1591
        output.append(node_output)
1592

    
1593
    return output
1594

    
1595

    
1596
class LUAddNode(LogicalUnit):
1597
  """Logical unit for adding node to the cluster.
1598

1599
  """
1600
  HPATH = "node-add"
1601
  HTYPE = constants.HTYPE_NODE
1602
  _OP_REQP = ["node_name"]
1603

    
1604
  def BuildHooksEnv(self):
1605
    """Build hooks env.
1606

1607
    This will run on all nodes before, and on all nodes + the new node after.
1608

1609
    """
1610
    env = {
1611
      "OP_TARGET": self.op.node_name,
1612
      "NODE_NAME": self.op.node_name,
1613
      "NODE_PIP": self.op.primary_ip,
1614
      "NODE_SIP": self.op.secondary_ip,
1615
      }
1616
    nodes_0 = self.cfg.GetNodeList()
1617
    nodes_1 = nodes_0 + [self.op.node_name, ]
1618
    return env, nodes_0, nodes_1
1619

    
1620
  def CheckPrereq(self):
1621
    """Check prerequisites.
1622

1623
    This checks:
1624
     - the new node is not already in the config
1625
     - it is resolvable
1626
     - its parameters (single/dual homed) matches the cluster
1627

1628
    Any errors are signalled by raising errors.OpPrereqError.
1629

1630
    """
1631
    node_name = self.op.node_name
1632
    cfg = self.cfg
1633

    
1634
    dns_data = utils.HostInfo(node_name)
1635

    
1636
    node = dns_data.name
1637
    primary_ip = self.op.primary_ip = dns_data.ip
1638
    secondary_ip = getattr(self.op, "secondary_ip", None)
1639
    if secondary_ip is None:
1640
      secondary_ip = primary_ip
1641
    if not utils.IsValidIP(secondary_ip):
1642
      raise errors.OpPrereqError("Invalid secondary IP given")
1643
    self.op.secondary_ip = secondary_ip
1644

    
1645
    node_list = cfg.GetNodeList()
1646
    if not self.op.readd and node in node_list:
1647
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1648
                                 node)
1649
    elif self.op.readd and node not in node_list:
1650
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1651

    
1652
    for existing_node_name in node_list:
1653
      existing_node = cfg.GetNodeInfo(existing_node_name)
1654

    
1655
      if self.op.readd and node == existing_node_name:
1656
        if (existing_node.primary_ip != primary_ip or
1657
            existing_node.secondary_ip != secondary_ip):
1658
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1659
                                     " address configuration as before")
1660
        continue
1661

    
1662
      if (existing_node.primary_ip == primary_ip or
1663
          existing_node.secondary_ip == primary_ip or
1664
          existing_node.primary_ip == secondary_ip or
1665
          existing_node.secondary_ip == secondary_ip):
1666
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1667
                                   " existing node %s" % existing_node.name)
1668

    
1669
    # check that the type of the node (single versus dual homed) is the
1670
    # same as for the master
1671
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1672
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1673
    newbie_singlehomed = secondary_ip == primary_ip
1674
    if master_singlehomed != newbie_singlehomed:
1675
      if master_singlehomed:
1676
        raise errors.OpPrereqError("The master has no private ip but the"
1677
                                   " new node has one")
1678
      else:
1679
        raise errors.OpPrereqError("The master has a private ip but the"
1680
                                   " new node doesn't have one")
1681

    
1682
    # checks reachablity
1683
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1684
      raise errors.OpPrereqError("Node not reachable by ping")
1685

    
1686
    if not newbie_singlehomed:
1687
      # check reachability from my secondary ip to newbie's secondary ip
1688
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1689
                           source=myself.secondary_ip):
1690
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1691
                                   " based ping to noded port")
1692

    
1693
    self.new_node = objects.Node(name=node,
1694
                                 primary_ip=primary_ip,
1695
                                 secondary_ip=secondary_ip)
1696

    
1697
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1698
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1699
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1700
                                   constants.VNC_PASSWORD_FILE)
1701

    
1702
  def Exec(self, feedback_fn):
1703
    """Adds the new node to the cluster.
1704

1705
    """
1706
    new_node = self.new_node
1707
    node = new_node.name
1708

    
1709
    # set up inter-node password and certificate and restarts the node daemon
1710
    gntpass = self.sstore.GetNodeDaemonPassword()
1711
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1712
      raise errors.OpExecError("ganeti password corruption detected")
1713
    f = open(constants.SSL_CERT_FILE)
1714
    try:
1715
      gntpem = f.read(8192)
1716
    finally:
1717
      f.close()
1718
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1719
    # so we use this to detect an invalid certificate; as long as the
1720
    # cert doesn't contain this, the here-document will be correctly
1721
    # parsed by the shell sequence below
1722
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1723
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1724
    if not gntpem.endswith("\n"):
1725
      raise errors.OpExecError("PEM must end with newline")
1726
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1727

    
1728
    # and then connect with ssh to set password and start ganeti-noded
1729
    # note that all the below variables are sanitized at this point,
1730
    # either by being constants or by the checks above
1731
    ss = self.sstore
1732
    mycommand = ("umask 077 && "
1733
                 "echo '%s' > '%s' && "
1734
                 "cat > '%s' << '!EOF.' && \n"
1735
                 "%s!EOF.\n%s restart" %
1736
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1737
                  constants.SSL_CERT_FILE, gntpem,
1738
                  constants.NODE_INITD_SCRIPT))
1739

    
1740
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1741
    if result.failed:
1742
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1743
                               " output: %s" %
1744
                               (node, result.fail_reason, result.output))
1745

    
1746
    # check connectivity
1747
    time.sleep(4)
1748

    
1749
    result = rpc.call_version([node])[node]
1750
    if result:
1751
      if constants.PROTOCOL_VERSION == result:
1752
        logger.Info("communication to node %s fine, sw version %s match" %
1753
                    (node, result))
1754
      else:
1755
        raise errors.OpExecError("Version mismatch master version %s,"
1756
                                 " node version %s" %
1757
                                 (constants.PROTOCOL_VERSION, result))
1758
    else:
1759
      raise errors.OpExecError("Cannot get version from the new node")
1760

    
1761
    # setup ssh on node
1762
    logger.Info("copy ssh key to node %s" % node)
1763
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1764
    keyarray = []
1765
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1766
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1767
                priv_key, pub_key]
1768

    
1769
    for i in keyfiles:
1770
      f = open(i, 'r')
1771
      try:
1772
        keyarray.append(f.read())
1773
      finally:
1774
        f.close()
1775

    
1776
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1777
                               keyarray[3], keyarray[4], keyarray[5])
1778

    
1779
    if not result:
1780
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1781

    
1782
    # Add node to our /etc/hosts, and add key to known_hosts
1783
    _AddHostToEtcHosts(new_node.name)
1784

    
1785
    if new_node.secondary_ip != new_node.primary_ip:
1786
      if not rpc.call_node_tcp_ping(new_node.name,
1787
                                    constants.LOCALHOST_IP_ADDRESS,
1788
                                    new_node.secondary_ip,
1789
                                    constants.DEFAULT_NODED_PORT,
1790
                                    10, False):
1791
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1792
                                 " you gave (%s). Please fix and re-run this"
1793
                                 " command." % new_node.secondary_ip)
1794

    
1795
    success, msg = self.ssh.VerifyNodeHostname(node)
1796
    if not success:
1797
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1798
                               " than the one the resolver gives: %s."
1799
                               " Please fix and re-run this command." %
1800
                               (node, msg))
1801

    
1802
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1803
    # including the node just added
1804
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1805
    dist_nodes = self.cfg.GetNodeList() + [node]
1806
    if myself.name in dist_nodes:
1807
      dist_nodes.remove(myself.name)
1808

    
1809
    logger.Debug("Copying hosts and known_hosts to all nodes")
1810
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1811
      result = rpc.call_upload_file(dist_nodes, fname)
1812
      for to_node in dist_nodes:
1813
        if not result[to_node]:
1814
          logger.Error("copy of file %s to node %s failed" %
1815
                       (fname, to_node))
1816

    
1817
    to_copy = ss.GetFileList()
1818
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1819
      to_copy.append(constants.VNC_PASSWORD_FILE)
1820
    for fname in to_copy:
1821
      if not self.ssh.CopyFileToNode(node, fname):
1822
        logger.Error("could not copy file %s to node %s" % (fname, node))
1823

    
1824
    if not self.op.readd:
1825
      logger.Info("adding node %s to cluster.conf" % node)
1826
      self.cfg.AddNode(new_node)
1827

    
1828

    
1829
class LUMasterFailover(LogicalUnit):
1830
  """Failover the master node to the current node.
1831

1832
  This is a special LU in that it must run on a non-master node.
1833

1834
  """
1835
  HPATH = "master-failover"
1836
  HTYPE = constants.HTYPE_CLUSTER
1837
  REQ_MASTER = False
1838
  _OP_REQP = []
1839

    
1840
  def BuildHooksEnv(self):
1841
    """Build hooks env.
1842

1843
    This will run on the new master only in the pre phase, and on all
1844
    the nodes in the post phase.
1845

1846
    """
1847
    env = {
1848
      "OP_TARGET": self.new_master,
1849
      "NEW_MASTER": self.new_master,
1850
      "OLD_MASTER": self.old_master,
1851
      }
1852
    return env, [self.new_master], self.cfg.GetNodeList()
1853

    
1854
  def CheckPrereq(self):
1855
    """Check prerequisites.
1856

1857
    This checks that we are not already the master.
1858

1859
    """
1860
    self.new_master = utils.HostInfo().name
1861
    self.old_master = self.sstore.GetMasterNode()
1862

    
1863
    if self.old_master == self.new_master:
1864
      raise errors.OpPrereqError("This commands must be run on the node"
1865
                                 " where you want the new master to be."
1866
                                 " %s is already the master" %
1867
                                 self.old_master)
1868

    
1869
  def Exec(self, feedback_fn):
1870
    """Failover the master node.
1871

1872
    This command, when run on a non-master node, will cause the current
1873
    master to cease being master, and the non-master to become new
1874
    master.
1875

1876
    """
1877
    #TODO: do not rely on gethostname returning the FQDN
1878
    logger.Info("setting master to %s, old master: %s" %
1879
                (self.new_master, self.old_master))
1880

    
1881
    if not rpc.call_node_stop_master(self.old_master):
1882
      logger.Error("could disable the master role on the old master"
1883
                   " %s, please disable manually" % self.old_master)
1884

    
1885
    ss = self.sstore
1886
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1887
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1888
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1889
      logger.Error("could not distribute the new simple store master file"
1890
                   " to the other nodes, please check.")
1891

    
1892
    if not rpc.call_node_start_master(self.new_master):
1893
      logger.Error("could not start the master role on the new master"
1894
                   " %s, please check" % self.new_master)
1895
      feedback_fn("Error in activating the master IP on the new master,"
1896
                  " please fix manually.")
1897

    
1898

    
1899

    
1900
class LUQueryClusterInfo(NoHooksLU):
1901
  """Query cluster configuration.
1902

1903
  """
1904
  _OP_REQP = []
1905
  REQ_MASTER = False
1906

    
1907
  def CheckPrereq(self):
1908
    """No prerequsites needed for this LU.
1909

1910
    """
1911
    pass
1912

    
1913
  def Exec(self, feedback_fn):
1914
    """Return cluster config.
1915

1916
    """
1917
    result = {
1918
      "name": self.sstore.GetClusterName(),
1919
      "software_version": constants.RELEASE_VERSION,
1920
      "protocol_version": constants.PROTOCOL_VERSION,
1921
      "config_version": constants.CONFIG_VERSION,
1922
      "os_api_version": constants.OS_API_VERSION,
1923
      "export_version": constants.EXPORT_VERSION,
1924
      "master": self.sstore.GetMasterNode(),
1925
      "architecture": (platform.architecture()[0], platform.machine()),
1926
      }
1927

    
1928
    return result
1929

    
1930

    
1931
class LUClusterCopyFile(NoHooksLU):
1932
  """Copy file to cluster.
1933

1934
  """
1935
  _OP_REQP = ["nodes", "filename"]
1936

    
1937
  def CheckPrereq(self):
1938
    """Check prerequisites.
1939

1940
    It should check that the named file exists and that the given list
1941
    of nodes is valid.
1942

1943
    """
1944
    if not os.path.exists(self.op.filename):
1945
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1946

    
1947
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1948

    
1949
  def Exec(self, feedback_fn):
1950
    """Copy a file from master to some nodes.
1951

1952
    Args:
1953
      opts - class with options as members
1954
      args - list containing a single element, the file name
1955
    Opts used:
1956
      nodes - list containing the name of target nodes; if empty, all nodes
1957

1958
    """
1959
    filename = self.op.filename
1960

    
1961
    myname = utils.HostInfo().name
1962

    
1963
    for node in self.nodes:
1964
      if node == myname:
1965
        continue
1966
      if not self.ssh.CopyFileToNode(node, filename):
1967
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1968

    
1969

    
1970
class LUDumpClusterConfig(NoHooksLU):
1971
  """Return a text-representation of the cluster-config.
1972

1973
  """
1974
  _OP_REQP = []
1975

    
1976
  def CheckPrereq(self):
1977
    """No prerequisites.
1978

1979
    """
1980
    pass
1981

    
1982
  def Exec(self, feedback_fn):
1983
    """Dump a representation of the cluster config to the standard output.
1984

1985
    """
1986
    return self.cfg.DumpConfig()
1987

    
1988

    
1989
class LURunClusterCommand(NoHooksLU):
1990
  """Run a command on some nodes.
1991

1992
  """
1993
  _OP_REQP = ["command", "nodes"]
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    It checks that the given list of nodes is valid.
1999

2000
    """
2001
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2002

    
2003
  def Exec(self, feedback_fn):
2004
    """Run a command on some nodes.
2005

2006
    """
2007
    # put the master at the end of the nodes list
2008
    master_node = self.sstore.GetMasterNode()
2009
    if master_node in self.nodes:
2010
      self.nodes.remove(master_node)
2011
      self.nodes.append(master_node)
2012

    
2013
    data = []
2014
    for node in self.nodes:
2015
      result = self.ssh.Run(node, "root", self.op.command)
2016
      data.append((node, result.output, result.exit_code))
2017

    
2018
    return data
2019

    
2020

    
2021
class LUActivateInstanceDisks(NoHooksLU):
2022
  """Bring up an instance's disks.
2023

2024
  """
2025
  _OP_REQP = ["instance_name"]
2026

    
2027
  def CheckPrereq(self):
2028
    """Check prerequisites.
2029

2030
    This checks that the instance is in the cluster.
2031

2032
    """
2033
    instance = self.cfg.GetInstanceInfo(
2034
      self.cfg.ExpandInstanceName(self.op.instance_name))
2035
    if instance is None:
2036
      raise errors.OpPrereqError("Instance '%s' not known" %
2037
                                 self.op.instance_name)
2038
    self.instance = instance
2039

    
2040

    
2041
  def Exec(self, feedback_fn):
2042
    """Activate the disks.
2043

2044
    """
2045
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2046
    if not disks_ok:
2047
      raise errors.OpExecError("Cannot activate block devices")
2048

    
2049
    return disks_info
2050

    
2051

    
2052
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2053
  """Prepare the block devices for an instance.
2054

2055
  This sets up the block devices on all nodes.
2056

2057
  Args:
2058
    instance: a ganeti.objects.Instance object
2059
    ignore_secondaries: if true, errors on secondary nodes won't result
2060
                        in an error return from the function
2061

2062
  Returns:
2063
    false if the operation failed
2064
    list of (host, instance_visible_name, node_visible_name) if the operation
2065
         suceeded with the mapping from node devices to instance devices
2066
  """
2067
  device_info = []
2068
  disks_ok = True
2069
  iname = instance.name
2070
  # With the two passes mechanism we try to reduce the window of
2071
  # opportunity for the race condition of switching DRBD to primary
2072
  # before handshaking occured, but we do not eliminate it
2073

    
2074
  # The proper fix would be to wait (with some limits) until the
2075
  # connection has been made and drbd transitions from WFConnection
2076
  # into any other network-connected state (Connected, SyncTarget,
2077
  # SyncSource, etc.)
2078

    
2079
  # 1st pass, assemble on all nodes in secondary mode
2080
  for inst_disk in instance.disks:
2081
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2082
      cfg.SetDiskID(node_disk, node)
2083
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2084
      if not result:
2085
        logger.Error("could not prepare block device %s on node %s"
2086
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2087
        if not ignore_secondaries:
2088
          disks_ok = False
2089

    
2090
  # FIXME: race condition on drbd migration to primary
2091

    
2092
  # 2nd pass, do only the primary node
2093
  for inst_disk in instance.disks:
2094
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2095
      if node != instance.primary_node:
2096
        continue
2097
      cfg.SetDiskID(node_disk, node)
2098
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2099
      if not result:
2100
        logger.Error("could not prepare block device %s on node %s"
2101
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2102
        disks_ok = False
2103
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2104

    
2105
  # leave the disks configured for the primary node
2106
  # this is a workaround that would be fixed better by
2107
  # improving the logical/physical id handling
2108
  for disk in instance.disks:
2109
    cfg.SetDiskID(disk, instance.primary_node)
2110

    
2111
  return disks_ok, device_info
2112

    
2113

    
2114
def _StartInstanceDisks(cfg, instance, force):
2115
  """Start the disks of an instance.
2116

2117
  """
2118
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2119
                                           ignore_secondaries=force)
2120
  if not disks_ok:
2121
    _ShutdownInstanceDisks(instance, cfg)
2122
    if force is not None and not force:
2123
      logger.Error("If the message above refers to a secondary node,"
2124
                   " you can retry the operation using '--force'.")
2125
    raise errors.OpExecError("Disk consistency error")
2126

    
2127

    
2128
class LUDeactivateInstanceDisks(NoHooksLU):
2129
  """Shutdown an instance's disks.
2130

2131
  """
2132
  _OP_REQP = ["instance_name"]
2133

    
2134
  def CheckPrereq(self):
2135
    """Check prerequisites.
2136

2137
    This checks that the instance is in the cluster.
2138

2139
    """
2140
    instance = self.cfg.GetInstanceInfo(
2141
      self.cfg.ExpandInstanceName(self.op.instance_name))
2142
    if instance is None:
2143
      raise errors.OpPrereqError("Instance '%s' not known" %
2144
                                 self.op.instance_name)
2145
    self.instance = instance
2146

    
2147
  def Exec(self, feedback_fn):
2148
    """Deactivate the disks
2149

2150
    """
2151
    instance = self.instance
2152
    ins_l = rpc.call_instance_list([instance.primary_node])
2153
    ins_l = ins_l[instance.primary_node]
2154
    if not type(ins_l) is list:
2155
      raise errors.OpExecError("Can't contact node '%s'" %
2156
                               instance.primary_node)
2157

    
2158
    if self.instance.name in ins_l:
2159
      raise errors.OpExecError("Instance is running, can't shutdown"
2160
                               " block devices.")
2161

    
2162
    _ShutdownInstanceDisks(instance, self.cfg)
2163

    
2164

    
2165
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2166
  """Shutdown block devices of an instance.
2167

2168
  This does the shutdown on all nodes of the instance.
2169

2170
  If the ignore_primary is false, errors on the primary node are
2171
  ignored.
2172

2173
  """
2174
  result = True
2175
  for disk in instance.disks:
2176
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2177
      cfg.SetDiskID(top_disk, node)
2178
      if not rpc.call_blockdev_shutdown(node, top_disk):
2179
        logger.Error("could not shutdown block device %s on node %s" %
2180
                     (disk.iv_name, node))
2181
        if not ignore_primary or node != instance.primary_node:
2182
          result = False
2183
  return result
2184

    
2185

    
2186
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2187
  """Checks if a node has enough free memory.
2188

2189
  This function check if a given node has the needed amount of free
2190
  memory. In case the node has less memory or we cannot get the
2191
  information from the node, this function raise an OpPrereqError
2192
  exception.
2193

2194
  Args:
2195
    - cfg: a ConfigWriter instance
2196
    - node: the node name
2197
    - reason: string to use in the error message
2198
    - requested: the amount of memory in MiB
2199

2200
  """
2201
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2202
  if not nodeinfo or not isinstance(nodeinfo, dict):
2203
    raise errors.OpPrereqError("Could not contact node %s for resource"
2204
                             " information" % (node,))
2205

    
2206
  free_mem = nodeinfo[node].get('memory_free')
2207
  if not isinstance(free_mem, int):
2208
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2209
                             " was '%s'" % (node, free_mem))
2210
  if requested > free_mem:
2211
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2212
                             " needed %s MiB, available %s MiB" %
2213
                             (node, reason, requested, free_mem))
2214

    
2215

    
2216
class LUStartupInstance(LogicalUnit):
2217
  """Starts an instance.
2218

2219
  """
2220
  HPATH = "instance-start"
2221
  HTYPE = constants.HTYPE_INSTANCE
2222
  _OP_REQP = ["instance_name", "force"]
2223

    
2224
  def BuildHooksEnv(self):
2225
    """Build hooks env.
2226

2227
    This runs on master, primary and secondary nodes of the instance.
2228

2229
    """
2230
    env = {
2231
      "FORCE": self.op.force,
2232
      }
2233
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2234
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2235
          list(self.instance.secondary_nodes))
2236
    return env, nl, nl
2237

    
2238
  def CheckPrereq(self):
2239
    """Check prerequisites.
2240

2241
    This checks that the instance is in the cluster.
2242

2243
    """
2244
    instance = self.cfg.GetInstanceInfo(
2245
      self.cfg.ExpandInstanceName(self.op.instance_name))
2246
    if instance is None:
2247
      raise errors.OpPrereqError("Instance '%s' not known" %
2248
                                 self.op.instance_name)
2249

    
2250
    # check bridges existance
2251
    _CheckInstanceBridgesExist(instance)
2252

    
2253
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2254
                         "starting instance %s" % instance.name,
2255
                         instance.memory)
2256

    
2257
    self.instance = instance
2258
    self.op.instance_name = instance.name
2259

    
2260
  def Exec(self, feedback_fn):
2261
    """Start the instance.
2262

2263
    """
2264
    instance = self.instance
2265
    force = self.op.force
2266
    extra_args = getattr(self.op, "extra_args", "")
2267

    
2268
    self.cfg.MarkInstanceUp(instance.name)
2269

    
2270
    node_current = instance.primary_node
2271

    
2272
    _StartInstanceDisks(self.cfg, instance, force)
2273

    
2274
    if not rpc.call_instance_start(node_current, instance, extra_args):
2275
      _ShutdownInstanceDisks(instance, self.cfg)
2276
      raise errors.OpExecError("Could not start instance")
2277

    
2278

    
2279
class LURebootInstance(LogicalUnit):
2280
  """Reboot an instance.
2281

2282
  """
2283
  HPATH = "instance-reboot"
2284
  HTYPE = constants.HTYPE_INSTANCE
2285
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2286

    
2287
  def BuildHooksEnv(self):
2288
    """Build hooks env.
2289

2290
    This runs on master, primary and secondary nodes of the instance.
2291

2292
    """
2293
    env = {
2294
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2295
      }
2296
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2297
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2298
          list(self.instance.secondary_nodes))
2299
    return env, nl, nl
2300

    
2301
  def CheckPrereq(self):
2302
    """Check prerequisites.
2303

2304
    This checks that the instance is in the cluster.
2305

2306
    """
2307
    instance = self.cfg.GetInstanceInfo(
2308
      self.cfg.ExpandInstanceName(self.op.instance_name))
2309
    if instance is None:
2310
      raise errors.OpPrereqError("Instance '%s' not known" %
2311
                                 self.op.instance_name)
2312

    
2313
    # check bridges existance
2314
    _CheckInstanceBridgesExist(instance)
2315

    
2316
    self.instance = instance
2317
    self.op.instance_name = instance.name
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Reboot the instance.
2321

2322
    """
2323
    instance = self.instance
2324
    ignore_secondaries = self.op.ignore_secondaries
2325
    reboot_type = self.op.reboot_type
2326
    extra_args = getattr(self.op, "extra_args", "")
2327

    
2328
    node_current = instance.primary_node
2329

    
2330
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2331
                           constants.INSTANCE_REBOOT_HARD,
2332
                           constants.INSTANCE_REBOOT_FULL]:
2333
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2334
                                  (constants.INSTANCE_REBOOT_SOFT,
2335
                                   constants.INSTANCE_REBOOT_HARD,
2336
                                   constants.INSTANCE_REBOOT_FULL))
2337

    
2338
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2339
                       constants.INSTANCE_REBOOT_HARD]:
2340
      if not rpc.call_instance_reboot(node_current, instance,
2341
                                      reboot_type, extra_args):
2342
        raise errors.OpExecError("Could not reboot instance")
2343
    else:
2344
      if not rpc.call_instance_shutdown(node_current, instance):
2345
        raise errors.OpExecError("could not shutdown instance for full reboot")
2346
      _ShutdownInstanceDisks(instance, self.cfg)
2347
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2348
      if not rpc.call_instance_start(node_current, instance, extra_args):
2349
        _ShutdownInstanceDisks(instance, self.cfg)
2350
        raise errors.OpExecError("Could not start instance for full reboot")
2351

    
2352
    self.cfg.MarkInstanceUp(instance.name)
2353

    
2354

    
2355
class LUShutdownInstance(LogicalUnit):
2356
  """Shutdown an instance.
2357

2358
  """
2359
  HPATH = "instance-stop"
2360
  HTYPE = constants.HTYPE_INSTANCE
2361
  _OP_REQP = ["instance_name"]
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = _BuildInstanceHookEnvByObject(self.instance)
2370
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2371
          list(self.instance.secondary_nodes))
2372
    return env, nl, nl
2373

    
2374
  def CheckPrereq(self):
2375
    """Check prerequisites.
2376

2377
    This checks that the instance is in the cluster.
2378

2379
    """
2380
    instance = self.cfg.GetInstanceInfo(
2381
      self.cfg.ExpandInstanceName(self.op.instance_name))
2382
    if instance is None:
2383
      raise errors.OpPrereqError("Instance '%s' not known" %
2384
                                 self.op.instance_name)
2385
    self.instance = instance
2386

    
2387
  def Exec(self, feedback_fn):
2388
    """Shutdown the instance.
2389

2390
    """
2391
    instance = self.instance
2392
    node_current = instance.primary_node
2393
    self.cfg.MarkInstanceDown(instance.name)
2394
    if not rpc.call_instance_shutdown(node_current, instance):
2395
      logger.Error("could not shutdown instance")
2396

    
2397
    _ShutdownInstanceDisks(instance, self.cfg)
2398

    
2399

    
2400
class LUReinstallInstance(LogicalUnit):
2401
  """Reinstall an instance.
2402

2403
  """
2404
  HPATH = "instance-reinstall"
2405
  HTYPE = constants.HTYPE_INSTANCE
2406
  _OP_REQP = ["instance_name"]
2407

    
2408
  def BuildHooksEnv(self):
2409
    """Build hooks env.
2410

2411
    This runs on master, primary and secondary nodes of the instance.
2412

2413
    """
2414
    env = _BuildInstanceHookEnvByObject(self.instance)
2415
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2416
          list(self.instance.secondary_nodes))
2417
    return env, nl, nl
2418

    
2419
  def CheckPrereq(self):
2420
    """Check prerequisites.
2421

2422
    This checks that the instance is in the cluster and is not running.
2423

2424
    """
2425
    instance = self.cfg.GetInstanceInfo(
2426
      self.cfg.ExpandInstanceName(self.op.instance_name))
2427
    if instance is None:
2428
      raise errors.OpPrereqError("Instance '%s' not known" %
2429
                                 self.op.instance_name)
2430
    if instance.disk_template == constants.DT_DISKLESS:
2431
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2432
                                 self.op.instance_name)
2433
    if instance.status != "down":
2434
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2435
                                 self.op.instance_name)
2436
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2437
    if remote_info:
2438
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2439
                                 (self.op.instance_name,
2440
                                  instance.primary_node))
2441

    
2442
    self.op.os_type = getattr(self.op, "os_type", None)
2443
    if self.op.os_type is not None:
2444
      # OS verification
2445
      pnode = self.cfg.GetNodeInfo(
2446
        self.cfg.ExpandNodeName(instance.primary_node))
2447
      if pnode is None:
2448
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2449
                                   self.op.pnode)
2450
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2451
      if not os_obj:
2452
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2453
                                   " primary node"  % self.op.os_type)
2454

    
2455
    self.instance = instance
2456

    
2457
  def Exec(self, feedback_fn):
2458
    """Reinstall the instance.
2459

2460
    """
2461
    inst = self.instance
2462

    
2463
    if self.op.os_type is not None:
2464
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2465
      inst.os = self.op.os_type
2466
      self.cfg.AddInstance(inst)
2467

    
2468
    _StartInstanceDisks(self.cfg, inst, None)
2469
    try:
2470
      feedback_fn("Running the instance OS create scripts...")
2471
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2472
        raise errors.OpExecError("Could not install OS for instance %s"
2473
                                 " on node %s" %
2474
                                 (inst.name, inst.primary_node))
2475
    finally:
2476
      _ShutdownInstanceDisks(inst, self.cfg)
2477

    
2478

    
2479
class LURenameInstance(LogicalUnit):
2480
  """Rename an instance.
2481

2482
  """
2483
  HPATH = "instance-rename"
2484
  HTYPE = constants.HTYPE_INSTANCE
2485
  _OP_REQP = ["instance_name", "new_name"]
2486

    
2487
  def BuildHooksEnv(self):
2488
    """Build hooks env.
2489

2490
    This runs on master, primary and secondary nodes of the instance.
2491

2492
    """
2493
    env = _BuildInstanceHookEnvByObject(self.instance)
2494
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2495
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2496
          list(self.instance.secondary_nodes))
2497
    return env, nl, nl
2498

    
2499
  def CheckPrereq(self):
2500
    """Check prerequisites.
2501

2502
    This checks that the instance is in the cluster and is not running.
2503

2504
    """
2505
    instance = self.cfg.GetInstanceInfo(
2506
      self.cfg.ExpandInstanceName(self.op.instance_name))
2507
    if instance is None:
2508
      raise errors.OpPrereqError("Instance '%s' not known" %
2509
                                 self.op.instance_name)
2510
    if instance.status != "down":
2511
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2512
                                 self.op.instance_name)
2513
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2514
    if remote_info:
2515
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2516
                                 (self.op.instance_name,
2517
                                  instance.primary_node))
2518
    self.instance = instance
2519

    
2520
    # new name verification
2521
    name_info = utils.HostInfo(self.op.new_name)
2522

    
2523
    self.op.new_name = new_name = name_info.name
2524
    instance_list = self.cfg.GetInstanceList()
2525
    if new_name in instance_list:
2526
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2527
                                 new_name)
2528

    
2529
    if not getattr(self.op, "ignore_ip", False):
2530
      command = ["fping", "-q", name_info.ip]
2531
      result = utils.RunCmd(command)
2532
      if not result.failed:
2533
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2534
                                   (name_info.ip, new_name))
2535

    
2536

    
2537
  def Exec(self, feedback_fn):
2538
    """Reinstall the instance.
2539

2540
    """
2541
    inst = self.instance
2542
    old_name = inst.name
2543

    
2544
    if inst.disk_template == constants.DT_FILE:
2545
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2546

    
2547
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2548

    
2549
    # re-read the instance from the configuration after rename
2550
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2551

    
2552
    if inst.disk_template == constants.DT_FILE:
2553
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2554
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2555
                                                old_file_storage_dir,
2556
                                                new_file_storage_dir)
2557

    
2558
      if not result:
2559
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2560
                                 " directory '%s' to '%s' (but the instance"
2561
                                 " has been renamed in Ganeti)" % (
2562
                                 inst.primary_node, old_file_storage_dir,
2563
                                 new_file_storage_dir))
2564

    
2565
      if not result[0]:
2566
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2567
                                 " (but the instance has been renamed in"
2568
                                 " Ganeti)" % (old_file_storage_dir,
2569
                                               new_file_storage_dir))
2570

    
2571
    _StartInstanceDisks(self.cfg, inst, None)
2572
    try:
2573
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2574
                                          "sda", "sdb"):
2575
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2576
               " instance has been renamed in Ganeti)" %
2577
               (inst.name, inst.primary_node))
2578
        logger.Error(msg)
2579
    finally:
2580
      _ShutdownInstanceDisks(inst, self.cfg)
2581

    
2582

    
2583
class LURemoveInstance(LogicalUnit):
2584
  """Remove an instance.
2585

2586
  """
2587
  HPATH = "instance-remove"
2588
  HTYPE = constants.HTYPE_INSTANCE
2589
  _OP_REQP = ["instance_name"]
2590

    
2591
  def BuildHooksEnv(self):
2592
    """Build hooks env.
2593

2594
    This runs on master, primary and secondary nodes of the instance.
2595

2596
    """
2597
    env = _BuildInstanceHookEnvByObject(self.instance)
2598
    nl = [self.sstore.GetMasterNode()]
2599
    return env, nl, nl
2600

    
2601
  def CheckPrereq(self):
2602
    """Check prerequisites.
2603

2604
    This checks that the instance is in the cluster.
2605

2606
    """
2607
    instance = self.cfg.GetInstanceInfo(
2608
      self.cfg.ExpandInstanceName(self.op.instance_name))
2609
    if instance is None:
2610
      raise errors.OpPrereqError("Instance '%s' not known" %
2611
                                 self.op.instance_name)
2612
    self.instance = instance
2613

    
2614
  def Exec(self, feedback_fn):
2615
    """Remove the instance.
2616

2617
    """
2618
    instance = self.instance
2619
    logger.Info("shutting down instance %s on node %s" %
2620
                (instance.name, instance.primary_node))
2621

    
2622
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2623
      if self.op.ignore_failures:
2624
        feedback_fn("Warning: can't shutdown instance")
2625
      else:
2626
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2627
                                 (instance.name, instance.primary_node))
2628

    
2629
    logger.Info("removing block devices for instance %s" % instance.name)
2630

    
2631
    if not _RemoveDisks(instance, self.cfg):
2632
      if self.op.ignore_failures:
2633
        feedback_fn("Warning: can't remove instance's disks")
2634
      else:
2635
        raise errors.OpExecError("Can't remove instance's disks")
2636

    
2637
    logger.Info("removing instance %s out of cluster config" % instance.name)
2638

    
2639
    self.cfg.RemoveInstance(instance.name)
2640

    
2641

    
2642
class LUQueryInstances(NoHooksLU):
2643
  """Logical unit for querying instances.
2644

2645
  """
2646
  _OP_REQP = ["output_fields", "names"]
2647

    
2648
  def CheckPrereq(self):
2649
    """Check prerequisites.
2650

2651
    This checks that the fields required are valid output fields.
2652

2653
    """
2654
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2655
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2656
                               "admin_state", "admin_ram",
2657
                               "disk_template", "ip", "mac", "bridge",
2658
                               "sda_size", "sdb_size", "vcpus"],
2659
                       dynamic=self.dynamic_fields,
2660
                       selected=self.op.output_fields)
2661

    
2662
    self.wanted = _GetWantedInstances(self, self.op.names)
2663

    
2664
  def Exec(self, feedback_fn):
2665
    """Computes the list of nodes and their attributes.
2666

2667
    """
2668
    instance_names = self.wanted
2669
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2670
                     in instance_names]
2671

    
2672
    # begin data gathering
2673

    
2674
    nodes = frozenset([inst.primary_node for inst in instance_list])
2675

    
2676
    bad_nodes = []
2677
    if self.dynamic_fields.intersection(self.op.output_fields):
2678
      live_data = {}
2679
      node_data = rpc.call_all_instances_info(nodes)
2680
      for name in nodes:
2681
        result = node_data[name]
2682
        if result:
2683
          live_data.update(result)
2684
        elif result == False:
2685
          bad_nodes.append(name)
2686
        # else no instance is alive
2687
    else:
2688
      live_data = dict([(name, {}) for name in instance_names])
2689

    
2690
    # end data gathering
2691

    
2692
    output = []
2693
    for instance in instance_list:
2694
      iout = []
2695
      for field in self.op.output_fields:
2696
        if field == "name":
2697
          val = instance.name
2698
        elif field == "os":
2699
          val = instance.os
2700
        elif field == "pnode":
2701
          val = instance.primary_node
2702
        elif field == "snodes":
2703
          val = list(instance.secondary_nodes)
2704
        elif field == "admin_state":
2705
          val = (instance.status != "down")
2706
        elif field == "oper_state":
2707
          if instance.primary_node in bad_nodes:
2708
            val = None
2709
          else:
2710
            val = bool(live_data.get(instance.name))
2711
        elif field == "status":
2712
          if instance.primary_node in bad_nodes:
2713
            val = "ERROR_nodedown"
2714
          else:
2715
            running = bool(live_data.get(instance.name))
2716
            if running:
2717
              if instance.status != "down":
2718
                val = "running"
2719
              else:
2720
                val = "ERROR_up"
2721
            else:
2722
              if instance.status != "down":
2723
                val = "ERROR_down"
2724
              else:
2725
                val = "ADMIN_down"
2726
        elif field == "admin_ram":
2727
          val = instance.memory
2728
        elif field == "oper_ram":
2729
          if instance.primary_node in bad_nodes:
2730
            val = None
2731
          elif instance.name in live_data:
2732
            val = live_data[instance.name].get("memory", "?")
2733
          else:
2734
            val = "-"
2735
        elif field == "disk_template":
2736
          val = instance.disk_template
2737
        elif field == "ip":
2738
          val = instance.nics[0].ip
2739
        elif field == "bridge":
2740
          val = instance.nics[0].bridge
2741
        elif field == "mac":
2742
          val = instance.nics[0].mac
2743
        elif field == "sda_size" or field == "sdb_size":
2744
          disk = instance.FindDisk(field[:3])
2745
          if disk is None:
2746
            val = None
2747
          else:
2748
            val = disk.size
2749
        elif field == "vcpus":
2750
          val = instance.vcpus
2751
        else:
2752
          raise errors.ParameterError(field)
2753
        iout.append(val)
2754
      output.append(iout)
2755

    
2756
    return output
2757

    
2758

    
2759
class LUFailoverInstance(LogicalUnit):
2760
  """Failover an instance.
2761

2762
  """
2763
  HPATH = "instance-failover"
2764
  HTYPE = constants.HTYPE_INSTANCE
2765
  _OP_REQP = ["instance_name", "ignore_consistency"]
2766

    
2767
  def BuildHooksEnv(self):
2768
    """Build hooks env.
2769

2770
    This runs on master, primary and secondary nodes of the instance.
2771

2772
    """
2773
    env = {
2774
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2775
      }
2776
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2777
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2778
    return env, nl, nl
2779

    
2780
  def CheckPrereq(self):
2781
    """Check prerequisites.
2782

2783
    This checks that the instance is in the cluster.
2784

2785
    """
2786
    instance = self.cfg.GetInstanceInfo(
2787
      self.cfg.ExpandInstanceName(self.op.instance_name))
2788
    if instance is None:
2789
      raise errors.OpPrereqError("Instance '%s' not known" %
2790
                                 self.op.instance_name)
2791

    
2792
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2793
      raise errors.OpPrereqError("Instance's disk layout is not"
2794
                                 " network mirrored, cannot failover.")
2795

    
2796
    secondary_nodes = instance.secondary_nodes
2797
    if not secondary_nodes:
2798
      raise errors.ProgrammerError("no secondary node but using "
2799
                                   "DT_REMOTE_RAID1 template")
2800

    
2801
    target_node = secondary_nodes[0]
2802
    # check memory requirements on the secondary node
2803
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2804
                         instance.name, instance.memory)
2805

    
2806
    # check bridge existance
2807
    brlist = [nic.bridge for nic in instance.nics]
2808
    if not rpc.call_bridges_exist(target_node, brlist):
2809
      raise errors.OpPrereqError("One or more target bridges %s does not"
2810
                                 " exist on destination node '%s'" %
2811
                                 (brlist, target_node))
2812

    
2813
    self.instance = instance
2814

    
2815
  def Exec(self, feedback_fn):
2816
    """Failover an instance.
2817

2818
    The failover is done by shutting it down on its present node and
2819
    starting it on the secondary.
2820

2821
    """
2822
    instance = self.instance
2823

    
2824
    source_node = instance.primary_node
2825
    target_node = instance.secondary_nodes[0]
2826

    
2827
    feedback_fn("* checking disk consistency between source and target")
2828
    for dev in instance.disks:
2829
      # for remote_raid1, these are md over drbd
2830
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2831
        if instance.status == "up" and not self.op.ignore_consistency:
2832
          raise errors.OpExecError("Disk %s is degraded on target node,"
2833
                                   " aborting failover." % dev.iv_name)
2834

    
2835
    feedback_fn("* shutting down instance on source node")
2836
    logger.Info("Shutting down instance %s on node %s" %
2837
                (instance.name, source_node))
2838

    
2839
    if not rpc.call_instance_shutdown(source_node, instance):
2840
      if self.op.ignore_consistency:
2841
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2842
                     " anyway. Please make sure node %s is down"  %
2843
                     (instance.name, source_node, source_node))
2844
      else:
2845
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2846
                                 (instance.name, source_node))
2847

    
2848
    feedback_fn("* deactivating the instance's disks on source node")
2849
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2850
      raise errors.OpExecError("Can't shut down the instance's disks.")
2851

    
2852
    instance.primary_node = target_node
2853
    # distribute new instance config to the other nodes
2854
    self.cfg.AddInstance(instance)
2855

    
2856
    # Only start the instance if it's marked as up
2857
    if instance.status == "up":
2858
      feedback_fn("* activating the instance's disks on target node")
2859
      logger.Info("Starting instance %s on node %s" %
2860
                  (instance.name, target_node))
2861

    
2862
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2863
                                               ignore_secondaries=True)
2864
      if not disks_ok:
2865
        _ShutdownInstanceDisks(instance, self.cfg)
2866
        raise errors.OpExecError("Can't activate the instance's disks")
2867

    
2868
      feedback_fn("* starting the instance on the target node")
2869
      if not rpc.call_instance_start(target_node, instance, None):
2870
        _ShutdownInstanceDisks(instance, self.cfg)
2871
        raise errors.OpExecError("Could not start instance %s on node %s." %
2872
                                 (instance.name, target_node))
2873

    
2874

    
2875
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2876
  """Create a tree of block devices on the primary node.
2877

2878
  This always creates all devices.
2879

2880
  """
2881
  if device.children:
2882
    for child in device.children:
2883
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2884
        return False
2885

    
2886
  cfg.SetDiskID(device, node)
2887
  new_id = rpc.call_blockdev_create(node, device, device.size,
2888
                                    instance.name, True, info)
2889
  if not new_id:
2890
    return False
2891
  if device.physical_id is None:
2892
    device.physical_id = new_id
2893
  return True
2894

    
2895

    
2896
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2897
  """Create a tree of block devices on a secondary node.
2898

2899
  If this device type has to be created on secondaries, create it and
2900
  all its children.
2901

2902
  If not, just recurse to children keeping the same 'force' value.
2903

2904
  """
2905
  if device.CreateOnSecondary():
2906
    force = True
2907
  if device.children:
2908
    for child in device.children:
2909
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2910
                                        child, force, info):
2911
        return False
2912

    
2913
  if not force:
2914
    return True
2915
  cfg.SetDiskID(device, node)
2916
  new_id = rpc.call_blockdev_create(node, device, device.size,
2917
                                    instance.name, False, info)
2918
  if not new_id:
2919
    return False
2920
  if device.physical_id is None:
2921
    device.physical_id = new_id
2922
  return True
2923

    
2924

    
2925
def _GenerateUniqueNames(cfg, exts):
2926
  """Generate a suitable LV name.
2927

2928
  This will generate a logical volume name for the given instance.
2929

2930
  """
2931
  results = []
2932
  for val in exts:
2933
    new_id = cfg.GenerateUniqueID()
2934
    results.append("%s%s" % (new_id, val))
2935
  return results
2936

    
2937

    
2938
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2939
  """Generate a drbd device complete with its children.
2940

2941
  """
2942
  port = cfg.AllocatePort()
2943
  vgname = cfg.GetVGName()
2944
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2945
                          logical_id=(vgname, names[0]))
2946
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2947
                          logical_id=(vgname, names[1]))
2948
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2949
                          logical_id = (primary, secondary, port),
2950
                          children = [dev_data, dev_meta])
2951
  return drbd_dev
2952

    
2953

    
2954
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2955
  """Generate a drbd8 device complete with its children.
2956

2957
  """
2958
  port = cfg.AllocatePort()
2959
  vgname = cfg.GetVGName()
2960
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2961
                          logical_id=(vgname, names[0]))
2962
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2963
                          logical_id=(vgname, names[1]))
2964
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2965
                          logical_id = (primary, secondary, port),
2966
                          children = [dev_data, dev_meta],
2967
                          iv_name=iv_name)
2968
  return drbd_dev
2969

    
2970

    
2971
def _GenerateDiskTemplate(cfg, template_name,
2972
                          instance_name, primary_node,
2973
                          secondary_nodes, disk_sz, swap_sz,
2974
                          file_storage_dir, file_driver):
2975
  """Generate the entire disk layout for a given template type.
2976

2977
  """
2978
  #TODO: compute space requirements
2979

    
2980
  vgname = cfg.GetVGName()
2981
  if template_name == constants.DT_DISKLESS:
2982
    disks = []
2983
  elif template_name == constants.DT_PLAIN:
2984
    if len(secondary_nodes) != 0:
2985
      raise errors.ProgrammerError("Wrong template configuration")
2986

    
2987
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2988
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2989
                           logical_id=(vgname, names[0]),
2990
                           iv_name = "sda")
2991
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2992
                           logical_id=(vgname, names[1]),
2993
                           iv_name = "sdb")
2994
    disks = [sda_dev, sdb_dev]
2995
  elif template_name == constants.DT_DRBD8:
2996
    if len(secondary_nodes) != 1:
2997
      raise errors.ProgrammerError("Wrong template configuration")
2998
    remote_node = secondary_nodes[0]
2999
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3000
                                       ".sdb_data", ".sdb_meta"])
3001
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3002
                                         disk_sz, names[0:2], "sda")
3003
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3004
                                         swap_sz, names[2:4], "sdb")
3005
    disks = [drbd_sda_dev, drbd_sdb_dev]
3006
  elif template_name == constants.DT_FILE:
3007
    if len(secondary_nodes) != 0:
3008
      raise errors.ProgrammerError("Wrong template configuration")
3009

    
3010
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3011
                                iv_name="sda", logical_id=(file_driver,
3012
                                "%s/sda" % file_storage_dir))
3013
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3014
                                iv_name="sdb", logical_id=(file_driver,
3015
                                "%s/sdb" % file_storage_dir))
3016
    disks = [file_sda_dev, file_sdb_dev]
3017
  else:
3018
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3019
  return disks
3020

    
3021

    
3022
def _GetInstanceInfoText(instance):
3023
  """Compute that text that should be added to the disk's metadata.
3024

3025
  """
3026
  return "originstname+%s" % instance.name
3027

    
3028

    
3029
def _CreateDisks(cfg, instance):
3030
  """Create all disks for an instance.
3031

3032
  This abstracts away some work from AddInstance.
3033

3034
  Args:
3035
    instance: the instance object
3036

3037
  Returns:
3038
    True or False showing the success of the creation process
3039

3040
  """
3041
  info = _GetInstanceInfoText(instance)
3042

    
3043
  if instance.disk_template == constants.DT_FILE:
3044
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3045
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3046
                                              file_storage_dir)
3047

    
3048
    if not result:
3049
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3050
      return False
3051

    
3052
    if not result[0]:
3053
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3054
      return False
3055

    
3056
  for device in instance.disks:
3057
    logger.Info("creating volume %s for instance %s" %
3058
                (device.iv_name, instance.name))
3059
    #HARDCODE
3060
    for secondary_node in instance.secondary_nodes:
3061
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3062
                                        device, False, info):
3063
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3064
                     (device.iv_name, device, secondary_node))
3065
        return False
3066
    #HARDCODE
3067
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3068
                                    instance, device, info):
3069
      logger.Error("failed to create volume %s on primary!" %
3070
                   device.iv_name)
3071
      return False
3072

    
3073
  return True
3074

    
3075

    
3076
def _RemoveDisks(instance, cfg):
3077
  """Remove all disks for an instance.
3078

3079
  This abstracts away some work from `AddInstance()` and
3080
  `RemoveInstance()`. Note that in case some of the devices couldn't
3081
  be removed, the removal will continue with the other ones (compare
3082
  with `_CreateDisks()`).
3083

3084
  Args:
3085
    instance: the instance object
3086

3087
  Returns:
3088
    True or False showing the success of the removal proces
3089

3090
  """
3091
  logger.Info("removing block devices for instance %s" % instance.name)
3092

    
3093
  result = True
3094
  for device in instance.disks:
3095
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3096
      cfg.SetDiskID(disk, node)
3097
      if not rpc.call_blockdev_remove(node, disk):
3098
        logger.Error("could not remove block device %s on node %s,"
3099
                     " continuing anyway" %
3100
                     (device.iv_name, node))
3101
        result = False
3102

    
3103
  if instance.disk_template == constants.DT_FILE:
3104
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3105
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3106
                                            file_storage_dir):
3107
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3108
      result = False
3109

    
3110
  return result
3111

    
3112

    
3113
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3114
  """Compute disk size requirements in the volume group
3115

3116
  This is currently hard-coded for the two-drive layout.
3117

3118
  """
3119
  # Required free disk space as a function of disk and swap space
3120
  req_size_dict = {
3121
    constants.DT_DISKLESS: None,
3122
    constants.DT_PLAIN: disk_size + swap_size,
3123
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3124
    constants.DT_DRBD8: disk_size + swap_size + 256,
3125
    constants.DT_FILE: None,
3126
  }
3127

    
3128
  if disk_template not in req_size_dict:
3129
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3130
                                 " is unknown" %  disk_template)
3131

    
3132
  return req_size_dict[disk_template]
3133

    
3134

    
3135
class LUCreateInstance(LogicalUnit):
3136
  """Create an instance.
3137

3138
  """
3139
  HPATH = "instance-add"
3140
  HTYPE = constants.HTYPE_INSTANCE
3141
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3142
              "disk_template", "swap_size", "mode", "start", "vcpus",
3143
              "wait_for_sync", "ip_check", "mac"]
3144

    
3145
  def _RunAllocator(self):
3146
    """Run the allocator based on input opcode.
3147

3148
    """
3149
    disks = [{"size": self.op.disk_size, "mode": "w"},
3150
             {"size": self.op.swap_size, "mode": "w"}]
3151
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3152
             "bridge": self.op.bridge}]
3153
    ial = IAllocator(self.cfg, self.sstore,
3154
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3155
                     name=self.op.instance_name,
3156
                     disk_template=self.op.disk_template,
3157
                     tags=[],
3158
                     os=self.op.os_type,
3159
                     vcpus=self.op.vcpus,
3160
                     mem_size=self.op.mem_size,
3161
                     disks=disks,
3162
                     nics=nics,
3163
                     )
3164

    
3165
    ial.Run(self.op.iallocator)
3166

    
3167
    if not ial.success:
3168
      raise errors.OpPrereqError("Can't compute nodes using"
3169
                                 " iallocator '%s': %s" % (self.op.iallocator,
3170
                                                           ial.info))
3171
    if len(ial.nodes) != ial.required_nodes:
3172
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3173
                                 " of nodes (%s), required %s" %
3174
                                 (len(ial.nodes), ial.required_nodes))
3175
    self.op.pnode = ial.nodes[0]
3176
    logger.ToStdout("Selected nodes for the instance: %s" %
3177
                    (", ".join(ial.nodes),))
3178
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3179
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3180
    if ial.required_nodes == 2:
3181
      self.op.snode = ial.nodes[1]
3182

    
3183
  def BuildHooksEnv(self):
3184
    """Build hooks env.
3185

3186
    This runs on master, primary and secondary nodes of the instance.
3187

3188
    """
3189
    env = {
3190
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3191
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3192
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3193
      "INSTANCE_ADD_MODE": self.op.mode,
3194
      }
3195
    if self.op.mode == constants.INSTANCE_IMPORT:
3196
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3197
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3198
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3199

    
3200
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3201
      primary_node=self.op.pnode,
3202
      secondary_nodes=self.secondaries,
3203
      status=self.instance_status,
3204
      os_type=self.op.os_type,
3205
      memory=self.op.mem_size,
3206
      vcpus=self.op.vcpus,
3207
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3208
    ))
3209

    
3210
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3211
          self.secondaries)
3212
    return env, nl, nl
3213

    
3214

    
3215
  def CheckPrereq(self):
3216
    """Check prerequisites.
3217

3218
    """
3219
    # set optional parameters to none if they don't exist
3220
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3221
                 "iallocator"]:
3222
      if not hasattr(self.op, attr):
3223
        setattr(self.op, attr, None)
3224

    
3225
    if self.op.mode not in (constants.INSTANCE_CREATE,
3226
                            constants.INSTANCE_IMPORT):
3227
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3228
                                 self.op.mode)
3229

    
3230
    if (not self.cfg.GetVGName() and
3231
        self.op.disk_template not in constants.DTS_NOT_LVM):
3232
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3233
                                 " instances")
3234

    
3235
    if self.op.mode == constants.INSTANCE_IMPORT:
3236
      src_node = getattr(self.op, "src_node", None)
3237
      src_path = getattr(self.op, "src_path", None)
3238
      if src_node is None or src_path is None:
3239
        raise errors.OpPrereqError("Importing an instance requires source"
3240
                                   " node and path options")
3241
      src_node_full = self.cfg.ExpandNodeName(src_node)
3242
      if src_node_full is None:
3243
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3244
      self.op.src_node = src_node = src_node_full
3245

    
3246
      if not os.path.isabs(src_path):
3247
        raise errors.OpPrereqError("The source path must be absolute")
3248

    
3249
      export_info = rpc.call_export_info(src_node, src_path)
3250

    
3251
      if not export_info:
3252
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3253

    
3254
      if not export_info.has_section(constants.INISECT_EXP):
3255
        raise errors.ProgrammerError("Corrupted export config")
3256

    
3257
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3258
      if (int(ei_version) != constants.EXPORT_VERSION):
3259
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3260
                                   (ei_version, constants.EXPORT_VERSION))
3261

    
3262
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3263
        raise errors.OpPrereqError("Can't import instance with more than"
3264
                                   " one data disk")
3265

    
3266
      # FIXME: are the old os-es, disk sizes, etc. useful?
3267
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3268
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3269
                                                         'disk0_dump'))
3270
      self.src_image = diskimage
3271
    else: # INSTANCE_CREATE
3272
      if getattr(self.op, "os_type", None) is None:
3273
        raise errors.OpPrereqError("No guest OS specified")
3274

    
3275
    #### instance parameters check
3276

    
3277
    # disk template and mirror node verification
3278
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3279
      raise errors.OpPrereqError("Invalid disk template name")
3280

    
3281
    # instance name verification
3282
    hostname1 = utils.HostInfo(self.op.instance_name)
3283

    
3284
    self.op.instance_name = instance_name = hostname1.name
3285
    instance_list = self.cfg.GetInstanceList()
3286
    if instance_name in instance_list:
3287
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3288
                                 instance_name)
3289

    
3290
    # ip validity checks
3291
    ip = getattr(self.op, "ip", None)
3292
    if ip is None or ip.lower() == "none":
3293
      inst_ip = None
3294
    elif ip.lower() == "auto":
3295
      inst_ip = hostname1.ip
3296
    else:
3297
      if not utils.IsValidIP(ip):
3298
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3299
                                   " like a valid IP" % ip)
3300
      inst_ip = ip
3301
    self.inst_ip = self.op.ip = inst_ip
3302

    
3303
    if self.op.start and not self.op.ip_check:
3304
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3305
                                 " adding an instance in start mode")
3306

    
3307
    if self.op.ip_check:
3308
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3309
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3310
                                   (hostname1.ip, instance_name))
3311

    
3312
    # MAC address verification
3313
    if self.op.mac != "auto":
3314
      if not utils.IsValidMac(self.op.mac.lower()):
3315
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3316
                                   self.op.mac)
3317

    
3318
    # bridge verification
3319
    bridge = getattr(self.op, "bridge", None)
3320
    if bridge is None:
3321
      self.op.bridge = self.cfg.GetDefBridge()
3322
    else:
3323
      self.op.bridge = bridge
3324

    
3325
    # boot order verification
3326
    if self.op.hvm_boot_order is not None:
3327
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3328
        raise errors.OpPrereqError("invalid boot order specified,"
3329
                                   " must be one or more of [acdn]")
3330
    # file storage checks
3331
    if (self.op.file_driver and
3332
        not self.op.file_driver in constants.FILE_DRIVER):
3333
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3334
                                 self.op.file_driver)
3335

    
3336
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3337
      raise errors.OpPrereqError("File storage directory not a relative"
3338
                                 " path")
3339
    #### allocator run
3340

    
3341
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3342
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3343
                                 " node must be given")
3344

    
3345
    if self.op.iallocator is not None:
3346
      self._RunAllocator()
3347

    
3348
    #### node related checks
3349

    
3350
    # check primary node
3351
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3352
    if pnode is None:
3353
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3354
                                 self.op.pnode)
3355
    self.op.pnode = pnode.name
3356
    self.pnode = pnode
3357
    self.secondaries = []
3358

    
3359
    # mirror node verification
3360
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3361
      if getattr(self.op, "snode", None) is None:
3362
        raise errors.OpPrereqError("The networked disk templates need"
3363
                                   " a mirror node")
3364

    
3365
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3366
      if snode_name is None:
3367
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3368
                                   self.op.snode)
3369
      elif snode_name == pnode.name:
3370
        raise errors.OpPrereqError("The secondary node cannot be"
3371
                                   " the primary node.")
3372
      self.secondaries.append(snode_name)
3373

    
3374
    req_size = _ComputeDiskSize(self.op.disk_template,
3375
                                self.op.disk_size, self.op.swap_size)
3376

    
3377
    # Check lv size requirements
3378
    if req_size is not None:
3379
      nodenames = [pnode.name] + self.secondaries
3380
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3381
      for node in nodenames:
3382
        info = nodeinfo.get(node, None)
3383
        if not info:
3384
          raise errors.OpPrereqError("Cannot get current information"
3385
                                     " from node '%s'" % nodeinfo)
3386
        vg_free = info.get('vg_free', None)
3387
        if not isinstance(vg_free, int):
3388
          raise errors.OpPrereqError("Can't compute free disk space on"
3389
                                     " node %s" % node)
3390
        if req_size > info['vg_free']:
3391
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3392
                                     " %d MB available, %d MB required" %
3393
                                     (node, info['vg_free'], req_size))
3394

    
3395
    # os verification
3396
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3397
    if not os_obj:
3398
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3399
                                 " primary node"  % self.op.os_type)
3400

    
3401
    if self.op.kernel_path == constants.VALUE_NONE:
3402
      raise errors.OpPrereqError("Can't set instance kernel to none")
3403

    
3404

    
3405
    # bridge check on primary node
3406
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3407
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3408
                                 " destination node '%s'" %
3409
                                 (self.op.bridge, pnode.name))
3410

    
3411
    if self.op.start:
3412
      self.instance_status = 'up'
3413
    else:
3414
      self.instance_status = 'down'
3415

    
3416
  def Exec(self, feedback_fn):
3417
    """Create and add the instance to the cluster.
3418

3419
    """
3420
    instance = self.op.instance_name
3421
    pnode_name = self.pnode.name
3422

    
3423
    if self.op.mac == "auto":
3424
      mac_address = self.cfg.GenerateMAC()
3425
    else:
3426
      mac_address = self.op.mac
3427

    
3428
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3429
    if self.inst_ip is not None:
3430
      nic.ip = self.inst_ip
3431

    
3432
    ht_kind = self.sstore.GetHypervisorType()
3433
    if ht_kind in constants.HTS_REQ_PORT:
3434
      network_port = self.cfg.AllocatePort()
3435
    else:
3436
      network_port = None
3437

    
3438
    # this is needed because os.path.join does not accept None arguments
3439
    if self.op.file_storage_dir is None:
3440
      string_file_storage_dir = ""
3441
    else:
3442
      string_file_storage_dir = self.op.file_storage_dir
3443

    
3444
    # build the full file storage dir path
3445
    file_storage_dir = os.path.normpath(os.path.join(
3446
                                        self.sstore.GetFileStorageDir(),
3447
                                        string_file_storage_dir, instance))
3448

    
3449

    
3450
    disks = _GenerateDiskTemplate(self.cfg,
3451
                                  self.op.disk_template,
3452
                                  instance, pnode_name,
3453
                                  self.secondaries, self.op.disk_size,
3454
                                  self.op.swap_size,
3455
                                  file_storage_dir,
3456
                                  self.op.file_driver)
3457

    
3458
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3459
                            primary_node=pnode_name,
3460
                            memory=self.op.mem_size,
3461
                            vcpus=self.op.vcpus,
3462
                            nics=[nic], disks=disks,
3463
                            disk_template=self.op.disk_template,
3464
                            status=self.instance_status,
3465
                            network_port=network_port,
3466
                            kernel_path=self.op.kernel_path,
3467
                            initrd_path=self.op.initrd_path,
3468
                            hvm_boot_order=self.op.hvm_boot_order,
3469
                            )
3470

    
3471
    feedback_fn("* creating instance disks...")
3472
    if not _CreateDisks(self.cfg, iobj):
3473
      _RemoveDisks(iobj, self.cfg)
3474
      raise errors.OpExecError("Device creation failed, reverting...")
3475

    
3476
    feedback_fn("adding instance %s to cluster config" % instance)
3477

    
3478
    self.cfg.AddInstance(iobj)
3479

    
3480
    if self.op.wait_for_sync:
3481
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3482
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3483
      # make sure the disks are not degraded (still sync-ing is ok)
3484
      time.sleep(15)
3485
      feedback_fn("* checking mirrors status")
3486
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3487
    else:
3488
      disk_abort = False
3489

    
3490
    if disk_abort:
3491
      _RemoveDisks(iobj, self.cfg)
3492
      self.cfg.RemoveInstance(iobj.name)
3493
      raise errors.OpExecError("There are some degraded disks for"
3494
                               " this instance")
3495

    
3496
    feedback_fn("creating os for instance %s on node %s" %
3497
                (instance, pnode_name))
3498

    
3499
    if iobj.disk_template != constants.DT_DISKLESS:
3500
      if self.op.mode == constants.INSTANCE_CREATE:
3501
        feedback_fn("* running the instance OS create scripts...")
3502
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3503
          raise errors.OpExecError("could not add os for instance %s"
3504
                                   " on node %s" %
3505
                                   (instance, pnode_name))
3506

    
3507
      elif self.op.mode == constants.INSTANCE_IMPORT:
3508
        feedback_fn("* running the instance OS import scripts...")
3509
        src_node = self.op.src_node
3510
        src_image = self.src_image
3511
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3512
                                                src_node, src_image):
3513
          raise errors.OpExecError("Could not import os for instance"
3514
                                   " %s on node %s" %
3515
                                   (instance, pnode_name))
3516
      else:
3517
        # also checked in the prereq part
3518
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3519
                                     % self.op.mode)
3520

    
3521
    if self.op.start:
3522
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3523
      feedback_fn("* starting instance...")
3524
      if not rpc.call_instance_start(pnode_name, iobj, None):
3525
        raise errors.OpExecError("Could not start instance")
3526

    
3527

    
3528
class LUConnectConsole(NoHooksLU):
3529
  """Connect to an instance's console.
3530

3531
  This is somewhat special in that it returns the command line that
3532
  you need to run on the master node in order to connect to the
3533
  console.
3534

3535
  """
3536
  _OP_REQP = ["instance_name"]
3537

    
3538
  def CheckPrereq(self):
3539
    """Check prerequisites.
3540

3541
    This checks that the instance is in the cluster.
3542

3543
    """
3544
    instance = self.cfg.GetInstanceInfo(
3545
      self.cfg.ExpandInstanceName(self.op.instance_name))
3546
    if instance is None:
3547
      raise errors.OpPrereqError("Instance '%s' not known" %
3548
                                 self.op.instance_name)
3549
    self.instance = instance
3550

    
3551
  def Exec(self, feedback_fn):
3552
    """Connect to the console of an instance
3553

3554
    """
3555
    instance = self.instance
3556
    node = instance.primary_node
3557

    
3558
    node_insts = rpc.call_instance_list([node])[node]
3559
    if node_insts is False:
3560
      raise errors.OpExecError("Can't connect to node %s." % node)
3561

    
3562
    if instance.name not in node_insts:
3563
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3564

    
3565
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3566

    
3567
    hyper = hypervisor.GetHypervisor()
3568
    console_cmd = hyper.GetShellCommandForConsole(instance)
3569

    
3570
    # build ssh cmdline
3571
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3572

    
3573

    
3574
class LUReplaceDisks(LogicalUnit):
3575
  """Replace the disks of an instance.
3576

3577
  """
3578
  HPATH = "mirrors-replace"
3579
  HTYPE = constants.HTYPE_INSTANCE
3580
  _OP_REQP = ["instance_name", "mode", "disks"]
3581

    
3582
  def _RunAllocator(self):
3583
    """Compute a new secondary node using an IAllocator.
3584

3585
    """
3586
    ial = IAllocator(self.cfg, self.sstore,
3587
                     mode=constants.IALLOCATOR_MODE_RELOC,
3588
                     name=self.op.instance_name,
3589
                     relocate_from=[self.sec_node])
3590

    
3591
    ial.Run(self.op.iallocator)
3592

    
3593
    if not ial.success:
3594
      raise errors.OpPrereqError("Can't compute nodes using"
3595
                                 " iallocator '%s': %s" % (self.op.iallocator,
3596
                                                           ial.info))
3597
    if len(ial.nodes) != ial.required_nodes:
3598
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3599
                                 " of nodes (%s), required %s" %
3600
                                 (len(ial.nodes), ial.required_nodes))
3601
    self.op.remote_node = ial.nodes[0]
3602
    logger.ToStdout("Selected new secondary for the instance: %s" %
3603
                    self.op.remote_node)
3604

    
3605
  def BuildHooksEnv(self):
3606
    """Build hooks env.
3607

3608
    This runs on the master, the primary and all the secondaries.
3609

3610
    """
3611
    env = {
3612
      "MODE": self.op.mode,
3613
      "NEW_SECONDARY": self.op.remote_node,
3614
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3615
      }
3616
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3617
    nl = [
3618
      self.sstore.GetMasterNode(),
3619
      self.instance.primary_node,
3620
      ]
3621
    if self.op.remote_node is not None:
3622
      nl.append(self.op.remote_node)
3623
    return env, nl, nl
3624

    
3625
  def CheckPrereq(self):
3626
    """Check prerequisites.
3627

3628
    This checks that the instance is in the cluster.
3629

3630
    """
3631
    if not hasattr(self.op, "remote_node"):
3632
      self.op.remote_node = None
3633

    
3634
    instance = self.cfg.GetInstanceInfo(
3635
      self.cfg.ExpandInstanceName(self.op.instance_name))
3636
    if instance is None:
3637
      raise errors.OpPrereqError("Instance '%s' not known" %
3638
                                 self.op.instance_name)
3639
    self.instance = instance
3640
    self.op.instance_name = instance.name
3641

    
3642
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3643
      raise errors.OpPrereqError("Instance's disk layout is not"
3644
                                 " network mirrored.")
3645

    
3646
    if len(instance.secondary_nodes) != 1:
3647
      raise errors.OpPrereqError("The instance has a strange layout,"
3648
                                 " expected one secondary but found %d" %
3649
                                 len(instance.secondary_nodes))
3650

    
3651
    self.sec_node = instance.secondary_nodes[0]
3652

    
3653
    ia_name = getattr(self.op, "iallocator", None)
3654
    if ia_name is not None:
3655
      if self.op.remote_node is not None:
3656
        raise errors.OpPrereqError("Give either the iallocator or the new"
3657
                                   " secondary, not both")
3658
      self.op.remote_node = self._RunAllocator()
3659

    
3660
    remote_node = self.op.remote_node
3661
    if remote_node is not None:
3662
      remote_node = self.cfg.ExpandNodeName(remote_node)
3663
      if remote_node is None:
3664
        raise errors.OpPrereqError("Node '%s' not known" %
3665
                                   self.op.remote_node)
3666
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3667
    else:
3668
      self.remote_node_info = None
3669
    if remote_node == instance.primary_node:
3670
      raise errors.OpPrereqError("The specified node is the primary node of"
3671
                                 " the instance.")
3672
    elif remote_node == self.sec_node:
3673
      if self.op.mode == constants.REPLACE_DISK_SEC:
3674
        # this is for DRBD8, where we can't execute the same mode of
3675
        # replacement as for drbd7 (no different port allocated)
3676
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3677
                                   " replacement")
3678
      # the user gave the current secondary, switch to
3679
      # 'no-replace-secondary' mode for drbd7
3680
      remote_node = None
3681
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3682
        self.op.mode != constants.REPLACE_DISK_ALL):
3683
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3684
                                 " disks replacement, not individual ones")
3685
    if instance.disk_template == constants.DT_DRBD8:
3686
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3687
          remote_node is not None):
3688
        # switch to replace secondary mode
3689
        self.op.mode = constants.REPLACE_DISK_SEC
3690

    
3691
      if self.op.mode == constants.REPLACE_DISK_ALL:
3692
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3693
                                   " secondary disk replacement, not"
3694
                                   " both at once")
3695
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3696
        if remote_node is not None:
3697
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3698
                                     " the secondary while doing a primary"
3699
                                     " node disk replacement")
3700
        self.tgt_node = instance.primary_node
3701
        self.oth_node = instance.secondary_nodes[0]
3702
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3703
        self.new_node = remote_node # this can be None, in which case
3704
                                    # we don't change the secondary
3705
        self.tgt_node = instance.secondary_nodes[0]
3706
        self.oth_node = instance.primary_node
3707
      else:
3708
        raise errors.ProgrammerError("Unhandled disk replace mode")
3709

    
3710
    for name in self.op.disks:
3711
      if instance.FindDisk(name) is None:
3712
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3713
                                   (name, instance.name))
3714
    self.op.remote_node = remote_node
3715

    
3716
  def _ExecRR1(self, feedback_fn):
3717
    """Replace the disks of an instance.
3718

3719
    """
3720
    instance = self.instance
3721
    iv_names = {}
3722
    # start of work
3723
    if self.op.remote_node is None:
3724
      remote_node = self.sec_node
3725
    else:
3726
      remote_node = self.op.remote_node
3727
    cfg = self.cfg
3728
    for dev in instance.disks:
3729
      size = dev.size
3730
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3731
      names = _GenerateUniqueNames(cfg, lv_names)
3732
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3733
                                       remote_node, size, names)
3734
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3735
      logger.Info("adding new mirror component on secondary for %s" %
3736
                  dev.iv_name)
3737
      #HARDCODE
3738
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3739
                                        new_drbd, False,
3740
                                        _GetInstanceInfoText(instance)):
3741
        raise errors.OpExecError("Failed to create new component on secondary"
3742
                                 " node %s. Full abort, cleanup manually!" %
3743
                                 remote_node)
3744

    
3745
      logger.Info("adding new mirror component on primary")
3746
      #HARDCODE
3747
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3748
                                      instance, new_drbd,
3749
                                      _GetInstanceInfoText(instance)):
3750
        # remove secondary dev
3751
        cfg.SetDiskID(new_drbd, remote_node)
3752
        rpc.call_blockdev_remove(remote_node, new_drbd)
3753
        raise errors.OpExecError("Failed to create volume on primary!"
3754
                                 " Full abort, cleanup manually!!")
3755

    
3756
      # the device exists now
3757
      # call the primary node to add the mirror to md
3758
      logger.Info("adding new mirror component to md")
3759
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3760
                                           [new_drbd]):
3761
        logger.Error("Can't add mirror compoment to md!")
3762
        cfg.SetDiskID(new_drbd, remote_node)
3763
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3764
          logger.Error("Can't rollback on secondary")
3765
        cfg.SetDiskID(new_drbd, instance.primary_node)
3766
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3767
          logger.Error("Can't rollback on primary")
3768
        raise errors.OpExecError("Full abort, cleanup manually!!")
3769

    
3770
      dev.children.append(new_drbd)
3771
      cfg.AddInstance(instance)
3772

    
3773
    # this can fail as the old devices are degraded and _WaitForSync
3774
    # does a combined result over all disks, so we don't check its
3775
    # return value
3776
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3777

    
3778
    # so check manually all the devices
3779
    for name in iv_names:
3780
      dev, child, new_drbd = iv_names[name]
3781
      cfg.SetDiskID(dev, instance.primary_node)
3782
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3783
      if is_degr:
3784
        raise errors.OpExecError("MD device %s is degraded!" % name)
3785
      cfg.SetDiskID(new_drbd, instance.primary_node)
3786
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3787
      if is_degr:
3788
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3789

    
3790
    for name in iv_names:
3791
      dev, child, new_drbd = iv_names[name]
3792
      logger.Info("remove mirror %s component" % name)
3793
      cfg.SetDiskID(dev, instance.primary_node)
3794
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3795
                                              dev, [child]):
3796
        logger.Error("Can't remove child from mirror, aborting"
3797
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3798
        continue
3799

    
3800
      for node in child.logical_id[:2]:
3801
        logger.Info("remove child device on %s" % node)
3802
        cfg.SetDiskID(child, node)
3803
        if not rpc.call_blockdev_remove(node, child):
3804
          logger.Error("Warning: failed to remove device from node %s,"
3805
                       " continuing operation." % node)
3806

    
3807
      dev.children.remove(child)
3808

    
3809
      cfg.AddInstance(instance)
3810

    
3811
  def _ExecD8DiskOnly(self, feedback_fn):
3812
    """Replace a disk on the primary or secondary for dbrd8.
3813

3814
    The algorithm for replace is quite complicated:
3815
      - for each disk to be replaced:
3816
        - create new LVs on the target node with unique names
3817
        - detach old LVs from the drbd device
3818
        - rename old LVs to name_replaced.<time_t>
3819
        - rename new LVs to old LVs
3820
        - attach the new LVs (with the old names now) to the drbd device
3821
      - wait for sync across all devices
3822
      - for each modified disk:
3823
        - remove old LVs (which have the name name_replaces.<time_t>)
3824

3825
    Failures are not very well handled.
3826

3827
    """
3828
    steps_total = 6
3829
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3830
    instance = self.instance
3831
    iv_names = {}
3832
    vgname = self.cfg.GetVGName()
3833
    # start of work
3834
    cfg = self.cfg
3835
    tgt_node = self.tgt_node
3836
    oth_node = self.oth_node
3837

    
3838
    # Step: check device activation
3839
    self.proc.LogStep(1, steps_total, "check device existence")
3840
    info("checking volume groups")
3841
    my_vg = cfg.GetVGName()
3842
    results = rpc.call_vg_list([oth_node, tgt_node])
3843
    if not results:
3844
      raise errors.OpExecError("Can't list volume groups on the nodes")
3845
    for node in oth_node, tgt_node:
3846
      res = results.get(node, False)
3847
      if not res or my_vg not in res:
3848
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3849
                                 (my_vg, node))
3850
    for dev in instance.disks:
3851
      if not dev.iv_name in self.op.disks:
3852
        continue
3853
      for node in tgt_node, oth_node:
3854
        info("checking %s on %s" % (dev.iv_name, node))
3855
        cfg.SetDiskID(dev, node)
3856
        if not rpc.call_blockdev_find(node, dev):
3857
          raise errors.OpExecError("Can't find device %s on node %s" %
3858
                                   (dev.iv_name, node))
3859

    
3860
    # Step: check other node consistency
3861
    self.proc.LogStep(2, steps_total, "check peer consistency")
3862
    for dev in instance.disks:
3863
      if not dev.iv_name in self.op.disks:
3864
        continue
3865
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3866
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3867
                                   oth_node==instance.primary_node):
3868
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3869
                                 " to replace disks on this node (%s)" %
3870
                                 (oth_node, tgt_node))
3871

    
3872
    # Step: create new storage
3873
    self.proc.LogStep(3, steps_total, "allocate new storage")
3874
    for dev in instance.disks:
3875
      if not dev.iv_name in self.op.disks:
3876
        continue
3877
      size = dev.size
3878
      cfg.SetDiskID(dev, tgt_node)
3879
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3880
      names = _GenerateUniqueNames(cfg, lv_names)
3881
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3882
                             logical_id=(vgname, names[0]))
3883
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3884
                             logical_id=(vgname, names[1]))
3885
      new_lvs = [lv_data, lv_meta]
3886
      old_lvs = dev.children
3887
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3888
      info("creating new local storage on %s for %s" %
3889
           (tgt_node, dev.iv_name))
3890
      # since we *always* want to create this LV, we use the
3891
      # _Create...OnPrimary (which forces the creation), even if we
3892
      # are talking about the secondary node
3893
      for new_lv in new_lvs:
3894
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3895
                                        _GetInstanceInfoText(instance)):
3896
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3897
                                   " node '%s'" %
3898
                                   (new_lv.logical_id[1], tgt_node))
3899

    
3900
    # Step: for each lv, detach+rename*2+attach
3901
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3902
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3903
      info("detaching %s drbd from local storage" % dev.iv_name)
3904
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3905
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3906
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3907
      #dev.children = []
3908
      #cfg.Update(instance)
3909

    
3910
      # ok, we created the new LVs, so now we know we have the needed
3911
      # storage; as such, we proceed on the target node to rename
3912
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3913
      # using the assumption that logical_id == physical_id (which in
3914
      # turn is the unique_id on that node)
3915

    
3916
      # FIXME(iustin): use a better name for the replaced LVs
3917
      temp_suffix = int(time.time())
3918
      ren_fn = lambda d, suff: (d.physical_id[0],
3919
                                d.physical_id[1] + "_replaced-%s" % suff)
3920
      # build the rename list based on what LVs exist on the node
3921
      rlist = []
3922
      for to_ren in old_lvs:
3923
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3924
        if find_res is not None: # device exists
3925
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3926

    
3927
      info("renaming the old LVs on the target node")
3928
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3929
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3930
      # now we rename the new LVs to the old LVs
3931
      info("renaming the new LVs on the target node")
3932
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3933
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3934
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3935

    
3936
      for old, new in zip(old_lvs, new_lvs):
3937
        new.logical_id = old.logical_id
3938
        cfg.SetDiskID(new, tgt_node)
3939

    
3940
      for disk in old_lvs:
3941
        disk.logical_id = ren_fn(disk, temp_suffix)
3942
        cfg.SetDiskID(disk, tgt_node)
3943

    
3944
      # now that the new lvs have the old name, we can add them to the device
3945
      info("adding new mirror component on %s" % tgt_node)
3946
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3947
        for new_lv in new_lvs:
3948
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3949
            warning("Can't rollback device %s", hint="manually cleanup unused"
3950
                    " logical volumes")
3951
        raise errors.OpExecError("Can't add local storage to drbd")
3952

    
3953
      dev.children = new_lvs
3954
      cfg.Update(instance)
3955

    
3956
    # Step: wait for sync
3957

    
3958
    # this can fail as the old devices are degraded and _WaitForSync
3959
    # does a combined result over all disks, so we don't check its
3960
    # return value
3961
    self.proc.LogStep(5, steps_total, "sync devices")
3962
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3963

    
3964
    # so check manually all the devices
3965
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3966
      cfg.SetDiskID(dev, instance.primary_node)
3967
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3968
      if is_degr:
3969
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3970

    
3971
    # Step: remove old storage
3972
    self.proc.LogStep(6, steps_total, "removing old storage