Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d1c2dd75

History | View | Annotate | Download (169.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    As for the node lists, the master should not be included in the
149
    them, as it will be added by the hooks runner in case this LU
150
    requires a cluster to run on (otherwise we don't have a node
151
    list). No nodes should be returned as an empty list (and not
152
    None).
153

154
    Note that if the HPATH for a LU class is None, this function will
155
    not be called.
156

157
    """
158
    raise NotImplementedError
159

    
160

    
161
class NoHooksLU(LogicalUnit):
162
  """Simple LU which runs no hooks.
163

164
  This LU is intended as a parent for other LogicalUnits which will
165
  run no hooks, in order to reduce duplicate code.
166

167
  """
168
  HPATH = None
169
  HTYPE = None
170

    
171
  def BuildHooksEnv(self):
172
    """Build hooks env.
173

174
    This is a no-op, since we don't run hooks.
175

176
    """
177
    return {}, [], []
178

    
179

    
180
def _AddHostToEtcHosts(hostname):
181
  """Wrapper around utils.SetEtcHostsEntry.
182

183
  """
184
  hi = utils.HostInfo(name=hostname)
185
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
186

    
187

    
188
def _RemoveHostFromEtcHosts(hostname):
189
  """Wrapper around utils.RemoveEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
194
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
195

    
196

    
197
def _GetWantedNodes(lu, nodes):
198
  """Returns list of checked and expanded node names.
199

200
  Args:
201
    nodes: List of nodes (strings) or None for all
202

203
  """
204
  if not isinstance(nodes, list):
205
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
206

    
207
  if nodes:
208
    wanted = []
209

    
210
    for name in nodes:
211
      node = lu.cfg.ExpandNodeName(name)
212
      if node is None:
213
        raise errors.OpPrereqError("No such node name '%s'" % name)
214
      wanted.append(node)
215

    
216
  else:
217
    wanted = lu.cfg.GetNodeList()
218
  return utils.NiceSort(wanted)
219

    
220

    
221
def _GetWantedInstances(lu, instances):
222
  """Returns list of checked and expanded instance names.
223

224
  Args:
225
    instances: List of instances (strings) or None for all
226

227
  """
228
  if not isinstance(instances, list):
229
    raise errors.OpPrereqError("Invalid argument type 'instances'")
230

    
231
  if instances:
232
    wanted = []
233

    
234
    for name in instances:
235
      instance = lu.cfg.ExpandInstanceName(name)
236
      if instance is None:
237
        raise errors.OpPrereqError("No such instance name '%s'" % name)
238
      wanted.append(instance)
239

    
240
  else:
241
    wanted = lu.cfg.GetInstanceList()
242
  return utils.NiceSort(wanted)
243

    
244

    
245
def _CheckOutputFields(static, dynamic, selected):
246
  """Checks whether all selected fields are valid.
247

248
  Args:
249
    static: Static fields
250
    dynamic: Dynamic fields
251

252
  """
253
  static_fields = frozenset(static)
254
  dynamic_fields = frozenset(dynamic)
255

    
256
  all_fields = static_fields | dynamic_fields
257

    
258
  if not all_fields.issuperset(selected):
259
    raise errors.OpPrereqError("Unknown output fields selected: %s"
260
                               % ",".join(frozenset(selected).
261
                                          difference(all_fields)))
262

    
263

    
264
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
265
                          memory, vcpus, nics):
266
  """Builds instance related env variables for hooks from single variables.
267

268
  Args:
269
    secondary_nodes: List of secondary nodes as strings
270
  """
271
  env = {
272
    "OP_TARGET": name,
273
    "INSTANCE_NAME": name,
274
    "INSTANCE_PRIMARY": primary_node,
275
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
276
    "INSTANCE_OS_TYPE": os_type,
277
    "INSTANCE_STATUS": status,
278
    "INSTANCE_MEMORY": memory,
279
    "INSTANCE_VCPUS": vcpus,
280
  }
281

    
282
  if nics:
283
    nic_count = len(nics)
284
    for idx, (ip, bridge, mac) in enumerate(nics):
285
      if ip is None:
286
        ip = ""
287
      env["INSTANCE_NIC%d_IP" % idx] = ip
288
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
289
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
290
  else:
291
    nic_count = 0
292

    
293
  env["INSTANCE_NIC_COUNT"] = nic_count
294

    
295
  return env
296

    
297

    
298
def _BuildInstanceHookEnvByObject(instance, override=None):
299
  """Builds instance related env variables for hooks from an object.
300

301
  Args:
302
    instance: objects.Instance object of instance
303
    override: dict of values to override
304
  """
305
  args = {
306
    'name': instance.name,
307
    'primary_node': instance.primary_node,
308
    'secondary_nodes': instance.secondary_nodes,
309
    'os_type': instance.os,
310
    'status': instance.os,
311
    'memory': instance.memory,
312
    'vcpus': instance.vcpus,
313
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
314
  }
315
  if override:
316
    args.update(override)
317
  return _BuildInstanceHookEnv(**args)
318

    
319

    
320
def _HasValidVG(vglist, vgname):
321
  """Checks if the volume group list is valid.
322

323
  A non-None return value means there's an error, and the return value
324
  is the error message.
325

326
  """
327
  vgsize = vglist.get(vgname, None)
328
  if vgsize is None:
329
    return "volume group '%s' missing" % vgname
330
  elif vgsize < 20480:
331
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
332
            (vgname, vgsize))
333
  return None
334

    
335

    
336
def _InitSSHSetup(node):
337
  """Setup the SSH configuration for the cluster.
338

339

340
  This generates a dsa keypair for root, adds the pub key to the
341
  permitted hosts and adds the hostkey to its own known hosts.
342

343
  Args:
344
    node: the name of this host as a fqdn
345

346
  """
347
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348

    
349
  for name in priv_key, pub_key:
350
    if os.path.exists(name):
351
      utils.CreateBackup(name)
352
    utils.RemoveFile(name)
353

    
354
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
355
                         "-f", priv_key,
356
                         "-q", "-N", ""])
357
  if result.failed:
358
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
359
                             result.output)
360

    
361
  f = open(pub_key, 'r')
362
  try:
363
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
364
  finally:
365
    f.close()
366

    
367

    
368
def _InitGanetiServerSetup(ss):
369
  """Setup the necessary configuration for the initial node daemon.
370

371
  This creates the nodepass file containing the shared password for
372
  the cluster and also generates the SSL certificate.
373

374
  """
375
  # Create pseudo random password
376
  randpass = sha.new(os.urandom(64)).hexdigest()
377
  # and write it into sstore
378
  ss.SetKey(ss.SS_NODED_PASS, randpass)
379

    
380
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
381
                         "-days", str(365*5), "-nodes", "-x509",
382
                         "-keyout", constants.SSL_CERT_FILE,
383
                         "-out", constants.SSL_CERT_FILE, "-batch"])
384
  if result.failed:
385
    raise errors.OpExecError("could not generate server ssl cert, command"
386
                             " %s had exitcode %s and error message %s" %
387
                             (result.cmd, result.exit_code, result.output))
388

    
389
  os.chmod(constants.SSL_CERT_FILE, 0400)
390

    
391
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
392

    
393
  if result.failed:
394
    raise errors.OpExecError("Could not start the node daemon, command %s"
395
                             " had exitcode %s and error %s" %
396
                             (result.cmd, result.exit_code, result.output))
397

    
398

    
399
def _CheckInstanceBridgesExist(instance):
400
  """Check that the brigdes needed by an instance exist.
401

402
  """
403
  # check bridges existance
404
  brlist = [nic.bridge for nic in instance.nics]
405
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
406
    raise errors.OpPrereqError("one or more target bridges %s does not"
407
                               " exist on destination node '%s'" %
408
                               (brlist, instance.primary_node))
409

    
410

    
411
class LUInitCluster(LogicalUnit):
412
  """Initialise the cluster.
413

414
  """
415
  HPATH = "cluster-init"
416
  HTYPE = constants.HTYPE_CLUSTER
417
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
418
              "def_bridge", "master_netdev", "file_storage_dir"]
419
  REQ_CLUSTER = False
420

    
421
  def BuildHooksEnv(self):
422
    """Build hooks env.
423

424
    Notes: Since we don't require a cluster, we must manually add
425
    ourselves in the post-run node list.
426

427
    """
428
    env = {"OP_TARGET": self.op.cluster_name}
429
    return env, [], [self.hostname.name]
430

    
431
  def CheckPrereq(self):
432
    """Verify that the passed name is a valid one.
433

434
    """
435
    if config.ConfigWriter.IsCluster():
436
      raise errors.OpPrereqError("Cluster is already initialised")
437

    
438
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
439
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
440
        raise errors.OpPrereqError("Please prepare the cluster VNC"
441
                                   "password file %s" %
442
                                   constants.VNC_PASSWORD_FILE)
443

    
444
    self.hostname = hostname = utils.HostInfo()
445

    
446
    if hostname.ip.startswith("127."):
447
      raise errors.OpPrereqError("This host's IP resolves to the private"
448
                                 " range (%s). Please fix DNS or %s." %
449
                                 (hostname.ip, constants.ETC_HOSTS))
450

    
451
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
452
                         source=constants.LOCALHOST_IP_ADDRESS):
453
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
454
                                 " to %s,\nbut this ip address does not"
455
                                 " belong to this host."
456
                                 " Aborting." % hostname.ip)
457

    
458
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459

    
460
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461
                     timeout=5):
462
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463

    
464
    secondary_ip = getattr(self.op, "secondary_ip", None)
465
    if secondary_ip and not utils.IsValidIP(secondary_ip):
466
      raise errors.OpPrereqError("Invalid secondary ip given")
467
    if (secondary_ip and
468
        secondary_ip != hostname.ip and
469
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
470
                           source=constants.LOCALHOST_IP_ADDRESS))):
471
      raise errors.OpPrereqError("You gave %s as secondary IP,"
472
                                 " but it does not belong to this host." %
473
                                 secondary_ip)
474
    self.secondary_ip = secondary_ip
475

    
476
    if not hasattr(self.op, "vg_name"):
477
      self.op.vg_name = None
478
    # if vg_name not None, checks if volume group is valid
479
    if self.op.vg_name:
480
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
481
      if vgstatus:
482
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
483
                                   " you are not using lvm" % vgstatus)
484

    
485
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
486

    
487
    if not os.path.isabs(self.op.file_storage_dir):
488
      raise errors.OpPrereqError("The file storage directory you have is"
489
                                 " not an absolute path.")
490

    
491
    if not os.path.exists(self.op.file_storage_dir):
492
      try:
493
        os.makedirs(self.op.file_storage_dir, 0750)
494
      except OSError, err:
495
        raise errors.OpPrereqError("Cannot create file storage directory"
496
                                   " '%s': %s" %
497
                                   (self.op.file_storage_dir, err))
498

    
499
    if not os.path.isdir(self.op.file_storage_dir):
500
      raise errors.OpPrereqError("The file storage directory '%s' is not"
501
                                 " a directory." % self.op.file_storage_dir)
502

    
503
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504
                    self.op.mac_prefix):
505
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
506
                                 self.op.mac_prefix)
507

    
508
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
509
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
510
                                 self.op.hypervisor_type)
511

    
512
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513
    if result.failed:
514
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
515
                                 (self.op.master_netdev,
516
                                  result.output.strip()))
517

    
518
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
519
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
520
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
521
                                 " executable." % constants.NODE_INITD_SCRIPT)
522

    
523
  def Exec(self, feedback_fn):
524
    """Initialize the cluster.
525

526
    """
527
    clustername = self.clustername
528
    hostname = self.hostname
529

    
530
    # set up the simple store
531
    self.sstore = ss = ssconf.SimpleStore()
532
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
533
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
534
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
535
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
536
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
537
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
538

    
539
    # set up the inter-node password and certificate
540
    _InitGanetiServerSetup(ss)
541

    
542
    # start the master ip
543
    rpc.call_node_start_master(hostname.name)
544

    
545
    # set up ssh config and /etc/hosts
546
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
547
    try:
548
      sshline = f.read()
549
    finally:
550
      f.close()
551
    sshkey = sshline.split(" ")[1]
552

    
553
    _AddHostToEtcHosts(hostname.name)
554
    _InitSSHSetup(hostname.name)
555

    
556
    # init of cluster config file
557
    self.cfg = cfgw = config.ConfigWriter()
558
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
559
                    sshkey, self.op.mac_prefix,
560
                    self.op.vg_name, self.op.def_bridge)
561

    
562
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
563

    
564

    
565
class LUDestroyCluster(NoHooksLU):
566
  """Logical unit for destroying the cluster.
567

568
  """
569
  _OP_REQP = []
570

    
571
  def CheckPrereq(self):
572
    """Check prerequisites.
573

574
    This checks whether the cluster is empty.
575

576
    Any errors are signalled by raising errors.OpPrereqError.
577

578
    """
579
    master = self.sstore.GetMasterNode()
580

    
581
    nodelist = self.cfg.GetNodeList()
582
    if len(nodelist) != 1 or nodelist[0] != master:
583
      raise errors.OpPrereqError("There are still %d node(s) in"
584
                                 " this cluster." % (len(nodelist) - 1))
585
    instancelist = self.cfg.GetInstanceList()
586
    if instancelist:
587
      raise errors.OpPrereqError("There are still %d instance(s) in"
588
                                 " this cluster." % len(instancelist))
589

    
590
  def Exec(self, feedback_fn):
591
    """Destroys the cluster.
592

593
    """
594
    master = self.sstore.GetMasterNode()
595
    if not rpc.call_node_stop_master(master):
596
      raise errors.OpExecError("Could not disable the master role")
597
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598
    utils.CreateBackup(priv_key)
599
    utils.CreateBackup(pub_key)
600
    rpc.call_node_leave_cluster(master)
601

    
602

    
603
class LUVerifyCluster(NoHooksLU):
604
  """Verifies the cluster status.
605

606
  """
607
  _OP_REQP = ["skip_checks"]
608

    
609
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
610
                  remote_version, feedback_fn):
611
    """Run multiple tests against a node.
612

613
    Test list:
614
      - compares ganeti version
615
      - checks vg existance and size > 20G
616
      - checks config file checksum
617
      - checks ssh to other nodes
618

619
    Args:
620
      node: name of the node to check
621
      file_list: required list of files
622
      local_cksum: dictionary of local files and their checksums
623

624
    """
625
    # compares ganeti version
626
    local_version = constants.PROTOCOL_VERSION
627
    if not remote_version:
628
      feedback_fn("  - ERROR: connection to %s failed" % (node))
629
      return True
630

    
631
    if local_version != remote_version:
632
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
633
                      (local_version, node, remote_version))
634
      return True
635

    
636
    # checks vg existance and size > 20G
637

    
638
    bad = False
639
    if not vglist:
640
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
641
                      (node,))
642
      bad = True
643
    else:
644
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
645
      if vgstatus:
646
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
647
        bad = True
648

    
649
    # checks config file checksum
650
    # checks ssh to any
651

    
652
    if 'filelist' not in node_result:
653
      bad = True
654
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
655
    else:
656
      remote_cksum = node_result['filelist']
657
      for file_name in file_list:
658
        if file_name not in remote_cksum:
659
          bad = True
660
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
661
        elif remote_cksum[file_name] != local_cksum[file_name]:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
664

    
665
    if 'nodelist' not in node_result:
666
      bad = True
667
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
668
    else:
669
      if node_result['nodelist']:
670
        bad = True
671
        for node in node_result['nodelist']:
672
          feedback_fn("  - ERROR: communication with node '%s': %s" %
673
                          (node, node_result['nodelist'][node]))
674
    hyp_result = node_result.get('hypervisor', None)
675
    if hyp_result is not None:
676
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
677
    return bad
678

    
679
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
680
                      node_instance, feedback_fn):
681
    """Verify an instance.
682

683
    This function checks to see if the required block devices are
684
    available on the instance's node.
685

686
    """
687
    bad = False
688

    
689
    node_current = instanceconfig.primary_node
690

    
691
    node_vol_should = {}
692
    instanceconfig.MapLVsByNode(node_vol_should)
693

    
694
    for node in node_vol_should:
695
      for volume in node_vol_should[node]:
696
        if node not in node_vol_is or volume not in node_vol_is[node]:
697
          feedback_fn("  - ERROR: volume %s missing on node %s" %
698
                          (volume, node))
699
          bad = True
700

    
701
    if not instanceconfig.status == 'down':
702
      if (node_current not in node_instance or
703
          not instance in node_instance[node_current]):
704
        feedback_fn("  - ERROR: instance %s not running on node %s" %
705
                        (instance, node_current))
706
        bad = True
707

    
708
    for node in node_instance:
709
      if (not node == node_current):
710
        if instance in node_instance[node]:
711
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
712
                          (instance, node))
713
          bad = True
714

    
715
    return bad
716

    
717
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
718
    """Verify if there are any unknown volumes in the cluster.
719

720
    The .os, .swap and backup volumes are ignored. All other volumes are
721
    reported as unknown.
722

723
    """
724
    bad = False
725

    
726
    for node in node_vol_is:
727
      for volume in node_vol_is[node]:
728
        if node not in node_vol_should or volume not in node_vol_should[node]:
729
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
730
                      (volume, node))
731
          bad = True
732
    return bad
733

    
734
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
735
    """Verify the list of running instances.
736

737
    This checks what instances are running but unknown to the cluster.
738

739
    """
740
    bad = False
741
    for node in node_instance:
742
      for runninginstance in node_instance[node]:
743
        if runninginstance not in instancelist:
744
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
745
                          (runninginstance, node))
746
          bad = True
747
    return bad
748

    
749
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
750
    """Verify N+1 Memory Resilience.
751

752
    Check that if one single node dies we can still start all the instances it
753
    was primary for.
754

755
    """
756
    bad = False
757

    
758
    for node, nodeinfo in node_info.iteritems():
759
      # This code checks that every node which is now listed as secondary has
760
      # enough memory to host all instances it is supposed to should a single
761
      # other node in the cluster fail.
762
      # FIXME: not ready for failover to an arbitrary node
763
      # FIXME: does not support file-backed instances
764
      # WARNING: we currently take into account down instances as well as up
765
      # ones, considering that even if they're down someone might want to start
766
      # them even in the event of a node failure.
767
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
768
        needed_mem = 0
769
        for instance in instances:
770
          needed_mem += instance_cfg[instance].memory
771
        if nodeinfo['mfree'] < needed_mem:
772
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
773
                      " failovers should node %s fail" % (node, prinode))
774
          bad = True
775
    return bad
776

    
777
  def CheckPrereq(self):
778
    """Check prerequisites.
779

780
    Transform the list of checks we're going to skip into a set and check that
781
    all its members are valid.
782

783
    """
784
    self.skip_set = frozenset(self.op.skip_checks)
785
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
786
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
787

    
788
  def Exec(self, feedback_fn):
789
    """Verify integrity of cluster, performing various test on nodes.
790

791
    """
792
    bad = False
793
    feedback_fn("* Verifying global settings")
794
    for msg in self.cfg.VerifyConfig():
795
      feedback_fn("  - ERROR: %s" % msg)
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
799
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
800
    i_non_redundant = [] # Non redundant instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = list(self.sstore.GetFileList())
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = rpc.call_instance_list(nodelist)
816
    all_vglist = rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': None,
821
      }
822
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
823
    all_rversion = rpc.call_version(nodelist)
824
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
825

    
826
    for node in nodelist:
827
      feedback_fn("* Verifying node %s" % node)
828
      result = self._VerifyNode(node, file_names, local_checksums,
829
                                all_vglist[node], all_nvinfo[node],
830
                                all_rversion[node], feedback_fn)
831
      bad = bad or result
832

    
833
      # node_volume
834
      volumeinfo = all_volumeinfo[node]
835

    
836
      if isinstance(volumeinfo, basestring):
837
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
838
                    (node, volumeinfo[-400:].encode('string_escape')))
839
        bad = True
840
        node_volume[node] = {}
841
      elif not isinstance(volumeinfo, dict):
842
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
843
        bad = True
844
        continue
845
      else:
846
        node_volume[node] = volumeinfo
847

    
848
      # node_instance
849
      nodeinstance = all_instanceinfo[node]
850
      if type(nodeinstance) != list:
851
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
852
        bad = True
853
        continue
854

    
855
      node_instance[node] = nodeinstance
856

    
857
      # node_info
858
      nodeinfo = all_ninfo[node]
859
      if not isinstance(nodeinfo, dict):
860
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
861
        bad = True
862
        continue
863

    
864
      try:
865
        node_info[node] = {
866
          "mfree": int(nodeinfo['memory_free']),
867
          "dfree": int(nodeinfo['vg_free']),
868
          "pinst": [],
869
          "sinst": [],
870
          # dictionary holding all instances this node is secondary for,
871
          # grouped by their primary node. Each key is a cluster node, and each
872
          # value is a list of instances which have the key as primary and the
873
          # current node as secondary.  this is handy to calculate N+1 memory
874
          # availability if you can only failover from a primary to its
875
          # secondary.
876
          "sinst-by-pnode": {},
877
        }
878
      except ValueError:
879
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
880
        bad = True
881
        continue
882

    
883
    node_vol_should = {}
884

    
885
    for instance in instancelist:
886
      feedback_fn("* Verifying instance %s" % instance)
887
      inst_config = self.cfg.GetInstanceInfo(instance)
888
      result =  self._VerifyInstance(instance, inst_config, node_volume,
889
                                     node_instance, feedback_fn)
890
      bad = bad or result
891

    
892
      inst_config.MapLVsByNode(node_vol_should)
893

    
894
      instance_cfg[instance] = inst_config
895

    
896
      pnode = inst_config.primary_node
897
      if pnode in node_info:
898
        node_info[pnode]['pinst'].append(instance)
899
      else:
900
        feedback_fn("  - ERROR: instance %s, connection to primary node"
901
                    " %s failed" % (instance, pnode))
902
        bad = True
903

    
904
      # If the instance is non-redundant we cannot survive losing its primary
905
      # node, so we are not N+1 compliant. On the other hand we have no disk
906
      # templates with more than one secondary so that situation is not well
907
      # supported either.
908
      # FIXME: does not support file-backed instances
909
      if len(inst_config.secondary_nodes) == 0:
910
        i_non_redundant.append(instance)
911
      elif len(inst_config.secondary_nodes) > 1:
912
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
913
                    % instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    return int(bad)
946

    
947

    
948
class LUVerifyDisks(NoHooksLU):
949
  """Verifies the cluster disks status.
950

951
  """
952
  _OP_REQP = []
953

    
954
  def CheckPrereq(self):
955
    """Check prerequisites.
956

957
    This has no prerequisites.
958

959
    """
960
    pass
961

    
962
  def Exec(self, feedback_fn):
963
    """Verify integrity of cluster disks.
964

965
    """
966
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
967

    
968
    vg_name = self.cfg.GetVGName()
969
    nodes = utils.NiceSort(self.cfg.GetNodeList())
970
    instances = [self.cfg.GetInstanceInfo(name)
971
                 for name in self.cfg.GetInstanceList()]
972

    
973
    nv_dict = {}
974
    for inst in instances:
975
      inst_lvs = {}
976
      if (inst.status != "up" or
977
          inst.disk_template not in constants.DTS_NET_MIRROR):
978
        continue
979
      inst.MapLVsByNode(inst_lvs)
980
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
981
      for node, vol_list in inst_lvs.iteritems():
982
        for vol in vol_list:
983
          nv_dict[(node, vol)] = inst
984

    
985
    if not nv_dict:
986
      return result
987

    
988
    node_lvs = rpc.call_volume_list(nodes, vg_name)
989

    
990
    to_act = set()
991
    for node in nodes:
992
      # node_volume
993
      lvs = node_lvs[node]
994

    
995
      if isinstance(lvs, basestring):
996
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
997
        res_nlvm[node] = lvs
998
      elif not isinstance(lvs, dict):
999
        logger.Info("connection to node %s failed or invalid data returned" %
1000
                    (node,))
1001
        res_nodes.append(node)
1002
        continue
1003

    
1004
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1005
        inst = nv_dict.pop((node, lv_name), None)
1006
        if (not lv_online and inst is not None
1007
            and inst.name not in res_instances):
1008
          res_instances.append(inst.name)
1009

    
1010
    # any leftover items in nv_dict are missing LVs, let's arrange the
1011
    # data better
1012
    for key, inst in nv_dict.iteritems():
1013
      if inst.name not in res_missing:
1014
        res_missing[inst.name] = []
1015
      res_missing[inst.name].append(key)
1016

    
1017
    return result
1018

    
1019

    
1020
class LURenameCluster(LogicalUnit):
1021
  """Rename the cluster.
1022

1023
  """
1024
  HPATH = "cluster-rename"
1025
  HTYPE = constants.HTYPE_CLUSTER
1026
  _OP_REQP = ["name"]
1027

    
1028
  def BuildHooksEnv(self):
1029
    """Build hooks env.
1030

1031
    """
1032
    env = {
1033
      "OP_TARGET": self.sstore.GetClusterName(),
1034
      "NEW_NAME": self.op.name,
1035
      }
1036
    mn = self.sstore.GetMasterNode()
1037
    return env, [mn], [mn]
1038

    
1039
  def CheckPrereq(self):
1040
    """Verify that the passed name is a valid one.
1041

1042
    """
1043
    hostname = utils.HostInfo(self.op.name)
1044

    
1045
    new_name = hostname.name
1046
    self.ip = new_ip = hostname.ip
1047
    old_name = self.sstore.GetClusterName()
1048
    old_ip = self.sstore.GetMasterIP()
1049
    if new_name == old_name and new_ip == old_ip:
1050
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1051
                                 " cluster has changed")
1052
    if new_ip != old_ip:
1053
      result = utils.RunCmd(["fping", "-q", new_ip])
1054
      if not result.failed:
1055
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056
                                   " reachable on the network. Aborting." %
1057
                                   new_ip)
1058

    
1059
    self.op.name = new_name
1060

    
1061
  def Exec(self, feedback_fn):
1062
    """Rename the cluster.
1063

1064
    """
1065
    clustername = self.op.name
1066
    ip = self.ip
1067
    ss = self.sstore
1068

    
1069
    # shutdown the master IP
1070
    master = ss.GetMasterNode()
1071
    if not rpc.call_node_stop_master(master):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      ss.SetKey(ss.SS_MASTER_IP, ip)
1077
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078

    
1079
      # Distribute updated ss config to all nodes
1080
      myself = self.cfg.GetNodeInfo(master)
1081
      dist_nodes = self.cfg.GetNodeList()
1082
      if myself.name in dist_nodes:
1083
        dist_nodes.remove(myself.name)
1084

    
1085
      logger.Debug("Copying updated ssconf data to all nodes")
1086
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087
        fname = ss.KeyToFilename(keyname)
1088
        result = rpc.call_upload_file(dist_nodes, fname)
1089
        for to_node in dist_nodes:
1090
          if not result[to_node]:
1091
            logger.Error("copy of file %s to node %s failed" %
1092
                         (fname, to_node))
1093
    finally:
1094
      if not rpc.call_node_start_master(master):
1095
        logger.Error("Could not re-enable the master role on the master,"
1096
                     " please restart manually.")
1097

    
1098

    
1099
def _RecursiveCheckIfLVMBased(disk):
1100
  """Check if the given disk or its children are lvm-based.
1101

1102
  Args:
1103
    disk: ganeti.objects.Disk object
1104

1105
  Returns:
1106
    boolean indicating whether a LD_LV dev_type was found or not
1107

1108
  """
1109
  if disk.children:
1110
    for chdisk in disk.children:
1111
      if _RecursiveCheckIfLVMBased(chdisk):
1112
        return True
1113
  return disk.dev_type == constants.LD_LV
1114

    
1115

    
1116
class LUSetClusterParams(LogicalUnit):
1117
  """Change the parameters of the cluster.
1118

1119
  """
1120
  HPATH = "cluster-modify"
1121
  HTYPE = constants.HTYPE_CLUSTER
1122
  _OP_REQP = []
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    """
1128
    env = {
1129
      "OP_TARGET": self.sstore.GetClusterName(),
1130
      "NEW_VG_NAME": self.op.vg_name,
1131
      }
1132
    mn = self.sstore.GetMasterNode()
1133
    return env, [mn], [mn]
1134

    
1135
  def CheckPrereq(self):
1136
    """Check prerequisites.
1137

1138
    This checks whether the given params don't conflict and
1139
    if the given volume group is valid.
1140

1141
    """
1142
    if not self.op.vg_name:
1143
      instances = [self.cfg.GetInstanceInfo(name)
1144
                   for name in self.cfg.GetInstanceList()]
1145
      for inst in instances:
1146
        for disk in inst.disks:
1147
          if _RecursiveCheckIfLVMBased(disk):
1148
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1149
                                       " lvm-based instances exist")
1150

    
1151
    # if vg_name not None, checks given volume group on all nodes
1152
    if self.op.vg_name:
1153
      node_list = self.cfg.GetNodeList()
1154
      vglist = rpc.call_vg_list(node_list)
1155
      for node in node_list:
1156
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1157
        if vgstatus:
1158
          raise errors.OpPrereqError("Error on node '%s': %s" %
1159
                                     (node, vgstatus))
1160

    
1161
  def Exec(self, feedback_fn):
1162
    """Change the parameters of the cluster.
1163

1164
    """
1165
    if self.op.vg_name != self.cfg.GetVGName():
1166
      self.cfg.SetVGName(self.op.vg_name)
1167
    else:
1168
      feedback_fn("Cluster LVM configuration already in desired"
1169
                  " state, not changing")
1170

    
1171

    
1172
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1173
  """Sleep and poll for an instance's disk to sync.
1174

1175
  """
1176
  if not instance.disks:
1177
    return True
1178

    
1179
  if not oneshot:
1180
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181

    
1182
  node = instance.primary_node
1183

    
1184
  for dev in instance.disks:
1185
    cfgw.SetDiskID(dev, node)
1186

    
1187
  retries = 0
1188
  while True:
1189
    max_time = 0
1190
    done = True
1191
    cumul_degraded = False
1192
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193
    if not rstats:
1194
      proc.LogWarning("Can't get any data from node %s" % node)
1195
      retries += 1
1196
      if retries >= 10:
1197
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1198
                                 " aborting." % node)
1199
      time.sleep(6)
1200
      continue
1201
    retries = 0
1202
    for i in range(len(rstats)):
1203
      mstat = rstats[i]
1204
      if mstat is None:
1205
        proc.LogWarning("Can't compute data for node %s/%s" %
1206
                        (node, instance.disks[i].iv_name))
1207
        continue
1208
      # we ignore the ldisk parameter
1209
      perc_done, est_time, is_degraded, _ = mstat
1210
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211
      if perc_done is not None:
1212
        done = False
1213
        if est_time is not None:
1214
          rem_time = "%d estimated seconds remaining" % est_time
1215
          max_time = est_time
1216
        else:
1217
          rem_time = "no time estimate"
1218
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219
                     (instance.disks[i].iv_name, perc_done, rem_time))
1220
    if done or oneshot:
1221
      break
1222

    
1223
    if unlock:
1224
      utils.Unlock('cmd')
1225
    try:
1226
      time.sleep(min(60, max_time))
1227
    finally:
1228
      if unlock:
1229
        utils.Lock('cmd')
1230

    
1231
  if done:
1232
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1233
  return not cumul_degraded
1234

    
1235

    
1236
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1237
  """Check that mirrors are not degraded.
1238

1239
  The ldisk parameter, if True, will change the test from the
1240
  is_degraded attribute (which represents overall non-ok status for
1241
  the device(s)) to the ldisk (representing the local storage status).
1242

1243
  """
1244
  cfgw.SetDiskID(dev, node)
1245
  if ldisk:
1246
    idx = 6
1247
  else:
1248
    idx = 5
1249

    
1250
  result = True
1251
  if on_primary or dev.AssembleOnSecondary():
1252
    rstats = rpc.call_blockdev_find(node, dev)
1253
    if not rstats:
1254
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1255
      result = False
1256
    else:
1257
      result = result and (not rstats[idx])
1258
  if dev.children:
1259
    for child in dev.children:
1260
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1261

    
1262
  return result
1263

    
1264

    
1265
class LUDiagnoseOS(NoHooksLU):
1266
  """Logical unit for OS diagnose/query.
1267

1268
  """
1269
  _OP_REQP = ["output_fields", "names"]
1270

    
1271
  def CheckPrereq(self):
1272
    """Check prerequisites.
1273

1274
    This always succeeds, since this is a pure query LU.
1275

1276
    """
1277
    if self.op.names:
1278
      raise errors.OpPrereqError("Selective OS query not supported")
1279

    
1280
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1281
    _CheckOutputFields(static=[],
1282
                       dynamic=self.dynamic_fields,
1283
                       selected=self.op.output_fields)
1284

    
1285
  @staticmethod
1286
  def _DiagnoseByOS(node_list, rlist):
1287
    """Remaps a per-node return list into an a per-os per-node dictionary
1288

1289
      Args:
1290
        node_list: a list with the names of all nodes
1291
        rlist: a map with node names as keys and OS objects as values
1292

1293
      Returns:
1294
        map: a map with osnames as keys and as value another map, with
1295
             nodes as
1296
             keys and list of OS objects as values
1297
             e.g. {"debian-etch": {"node1": [<object>,...],
1298
                                   "node2": [<object>,]}
1299
                  }
1300

1301
    """
1302
    all_os = {}
1303
    for node_name, nr in rlist.iteritems():
1304
      if not nr:
1305
        continue
1306
      for os in nr:
1307
        if os.name not in all_os:
1308
          # build a list of nodes for this os containing empty lists
1309
          # for each node in node_list
1310
          all_os[os.name] = {}
1311
          for nname in node_list:
1312
            all_os[os.name][nname] = []
1313
        all_os[os.name][node_name].append(os)
1314
    return all_os
1315

    
1316
  def Exec(self, feedback_fn):
1317
    """Compute the list of OSes.
1318

1319
    """
1320
    node_list = self.cfg.GetNodeList()
1321
    node_data = rpc.call_os_diagnose(node_list)
1322
    if node_data == False:
1323
      raise errors.OpExecError("Can't gather the list of OSes")
1324
    pol = self._DiagnoseByOS(node_list, node_data)
1325
    output = []
1326
    for os_name, os_data in pol.iteritems():
1327
      row = []
1328
      for field in self.op.output_fields:
1329
        if field == "name":
1330
          val = os_name
1331
        elif field == "valid":
1332
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1333
        elif field == "node_status":
1334
          val = {}
1335
          for node_name, nos_list in os_data.iteritems():
1336
            val[node_name] = [(v.status, v.path) for v in nos_list]
1337
        else:
1338
          raise errors.ParameterError(field)
1339
        row.append(val)
1340
      output.append(row)
1341

    
1342
    return output
1343

    
1344

    
1345
class LURemoveNode(LogicalUnit):
1346
  """Logical unit for removing a node.
1347

1348
  """
1349
  HPATH = "node-remove"
1350
  HTYPE = constants.HTYPE_NODE
1351
  _OP_REQP = ["node_name"]
1352

    
1353
  def BuildHooksEnv(self):
1354
    """Build hooks env.
1355

1356
    This doesn't run on the target node in the pre phase as a failed
1357
    node would not allows itself to run.
1358

1359
    """
1360
    env = {
1361
      "OP_TARGET": self.op.node_name,
1362
      "NODE_NAME": self.op.node_name,
1363
      }
1364
    all_nodes = self.cfg.GetNodeList()
1365
    all_nodes.remove(self.op.node_name)
1366
    return env, all_nodes, all_nodes
1367

    
1368
  def CheckPrereq(self):
1369
    """Check prerequisites.
1370

1371
    This checks:
1372
     - the node exists in the configuration
1373
     - it does not have primary or secondary instances
1374
     - it's not the master
1375

1376
    Any errors are signalled by raising errors.OpPrereqError.
1377

1378
    """
1379
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1380
    if node is None:
1381
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1382

    
1383
    instance_list = self.cfg.GetInstanceList()
1384

    
1385
    masternode = self.sstore.GetMasterNode()
1386
    if node.name == masternode:
1387
      raise errors.OpPrereqError("Node is the master node,"
1388
                                 " you need to failover first.")
1389

    
1390
    for instance_name in instance_list:
1391
      instance = self.cfg.GetInstanceInfo(instance_name)
1392
      if node.name == instance.primary_node:
1393
        raise errors.OpPrereqError("Instance %s still running on the node,"
1394
                                   " please remove first." % instance_name)
1395
      if node.name in instance.secondary_nodes:
1396
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1397
                                   " please remove first." % instance_name)
1398
    self.op.node_name = node.name
1399
    self.node = node
1400

    
1401
  def Exec(self, feedback_fn):
1402
    """Removes the node from the cluster.
1403

1404
    """
1405
    node = self.node
1406
    logger.Info("stopping the node daemon and removing configs from node %s" %
1407
                node.name)
1408

    
1409
    rpc.call_node_leave_cluster(node.name)
1410

    
1411
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1412

    
1413
    logger.Info("Removing node %s from config" % node.name)
1414

    
1415
    self.cfg.RemoveNode(node.name)
1416

    
1417
    _RemoveHostFromEtcHosts(node.name)
1418

    
1419

    
1420
class LUQueryNodes(NoHooksLU):
1421
  """Logical unit for querying nodes.
1422

1423
  """
1424
  _OP_REQP = ["output_fields", "names"]
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks that the fields required are valid output fields.
1430

1431
    """
1432
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1433
                                     "mtotal", "mnode", "mfree",
1434
                                     "bootid"])
1435

    
1436
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1437
                               "pinst_list", "sinst_list",
1438
                               "pip", "sip"],
1439
                       dynamic=self.dynamic_fields,
1440
                       selected=self.op.output_fields)
1441

    
1442
    self.wanted = _GetWantedNodes(self, self.op.names)
1443

    
1444
  def Exec(self, feedback_fn):
1445
    """Computes the list of nodes and their attributes.
1446

1447
    """
1448
    nodenames = self.wanted
1449
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1450

    
1451
    # begin data gathering
1452

    
1453
    if self.dynamic_fields.intersection(self.op.output_fields):
1454
      live_data = {}
1455
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1456
      for name in nodenames:
1457
        nodeinfo = node_data.get(name, None)
1458
        if nodeinfo:
1459
          live_data[name] = {
1460
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1461
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1462
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1463
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1464
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1465
            "bootid": nodeinfo['bootid'],
1466
            }
1467
        else:
1468
          live_data[name] = {}
1469
    else:
1470
      live_data = dict.fromkeys(nodenames, {})
1471

    
1472
    node_to_primary = dict([(name, set()) for name in nodenames])
1473
    node_to_secondary = dict([(name, set()) for name in nodenames])
1474

    
1475
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1476
                             "sinst_cnt", "sinst_list"))
1477
    if inst_fields & frozenset(self.op.output_fields):
1478
      instancelist = self.cfg.GetInstanceList()
1479

    
1480
      for instance_name in instancelist:
1481
        inst = self.cfg.GetInstanceInfo(instance_name)
1482
        if inst.primary_node in node_to_primary:
1483
          node_to_primary[inst.primary_node].add(inst.name)
1484
        for secnode in inst.secondary_nodes:
1485
          if secnode in node_to_secondary:
1486
            node_to_secondary[secnode].add(inst.name)
1487

    
1488
    # end data gathering
1489

    
1490
    output = []
1491
    for node in nodelist:
1492
      node_output = []
1493
      for field in self.op.output_fields:
1494
        if field == "name":
1495
          val = node.name
1496
        elif field == "pinst_list":
1497
          val = list(node_to_primary[node.name])
1498
        elif field == "sinst_list":
1499
          val = list(node_to_secondary[node.name])
1500
        elif field == "pinst_cnt":
1501
          val = len(node_to_primary[node.name])
1502
        elif field == "sinst_cnt":
1503
          val = len(node_to_secondary[node.name])
1504
        elif field == "pip":
1505
          val = node.primary_ip
1506
        elif field == "sip":
1507
          val = node.secondary_ip
1508
        elif field in self.dynamic_fields:
1509
          val = live_data[node.name].get(field, None)
1510
        else:
1511
          raise errors.ParameterError(field)
1512
        node_output.append(val)
1513
      output.append(node_output)
1514

    
1515
    return output
1516

    
1517

    
1518
class LUQueryNodeVolumes(NoHooksLU):
1519
  """Logical unit for getting volumes on node(s).
1520

1521
  """
1522
  _OP_REQP = ["nodes", "output_fields"]
1523

    
1524
  def CheckPrereq(self):
1525
    """Check prerequisites.
1526

1527
    This checks that the fields required are valid output fields.
1528

1529
    """
1530
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1531

    
1532
    _CheckOutputFields(static=["node"],
1533
                       dynamic=["phys", "vg", "name", "size", "instance"],
1534
                       selected=self.op.output_fields)
1535

    
1536

    
1537
  def Exec(self, feedback_fn):
1538
    """Computes the list of nodes and their attributes.
1539

1540
    """
1541
    nodenames = self.nodes
1542
    volumes = rpc.call_node_volumes(nodenames)
1543

    
1544
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1545
             in self.cfg.GetInstanceList()]
1546

    
1547
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1548

    
1549
    output = []
1550
    for node in nodenames:
1551
      if node not in volumes or not volumes[node]:
1552
        continue
1553

    
1554
      node_vols = volumes[node][:]
1555
      node_vols.sort(key=lambda vol: vol['dev'])
1556

    
1557
      for vol in node_vols:
1558
        node_output = []
1559
        for field in self.op.output_fields:
1560
          if field == "node":
1561
            val = node
1562
          elif field == "phys":
1563
            val = vol['dev']
1564
          elif field == "vg":
1565
            val = vol['vg']
1566
          elif field == "name":
1567
            val = vol['name']
1568
          elif field == "size":
1569
            val = int(float(vol['size']))
1570
          elif field == "instance":
1571
            for inst in ilist:
1572
              if node not in lv_by_node[inst]:
1573
                continue
1574
              if vol['name'] in lv_by_node[inst][node]:
1575
                val = inst.name
1576
                break
1577
            else:
1578
              val = '-'
1579
          else:
1580
            raise errors.ParameterError(field)
1581
          node_output.append(str(val))
1582

    
1583
        output.append(node_output)
1584

    
1585
    return output
1586

    
1587

    
1588
class LUAddNode(LogicalUnit):
1589
  """Logical unit for adding node to the cluster.
1590

1591
  """
1592
  HPATH = "node-add"
1593
  HTYPE = constants.HTYPE_NODE
1594
  _OP_REQP = ["node_name"]
1595

    
1596
  def BuildHooksEnv(self):
1597
    """Build hooks env.
1598

1599
    This will run on all nodes before, and on all nodes + the new node after.
1600

1601
    """
1602
    env = {
1603
      "OP_TARGET": self.op.node_name,
1604
      "NODE_NAME": self.op.node_name,
1605
      "NODE_PIP": self.op.primary_ip,
1606
      "NODE_SIP": self.op.secondary_ip,
1607
      }
1608
    nodes_0 = self.cfg.GetNodeList()
1609
    nodes_1 = nodes_0 + [self.op.node_name, ]
1610
    return env, nodes_0, nodes_1
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    This checks:
1616
     - the new node is not already in the config
1617
     - it is resolvable
1618
     - its parameters (single/dual homed) matches the cluster
1619

1620
    Any errors are signalled by raising errors.OpPrereqError.
1621

1622
    """
1623
    node_name = self.op.node_name
1624
    cfg = self.cfg
1625

    
1626
    dns_data = utils.HostInfo(node_name)
1627

    
1628
    node = dns_data.name
1629
    primary_ip = self.op.primary_ip = dns_data.ip
1630
    secondary_ip = getattr(self.op, "secondary_ip", None)
1631
    if secondary_ip is None:
1632
      secondary_ip = primary_ip
1633
    if not utils.IsValidIP(secondary_ip):
1634
      raise errors.OpPrereqError("Invalid secondary IP given")
1635
    self.op.secondary_ip = secondary_ip
1636

    
1637
    node_list = cfg.GetNodeList()
1638
    if not self.op.readd and node in node_list:
1639
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1640
                                 node)
1641
    elif self.op.readd and node not in node_list:
1642
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1643

    
1644
    for existing_node_name in node_list:
1645
      existing_node = cfg.GetNodeInfo(existing_node_name)
1646

    
1647
      if self.op.readd and node == existing_node_name:
1648
        if (existing_node.primary_ip != primary_ip or
1649
            existing_node.secondary_ip != secondary_ip):
1650
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1651
                                     " address configuration as before")
1652
        continue
1653

    
1654
      if (existing_node.primary_ip == primary_ip or
1655
          existing_node.secondary_ip == primary_ip or
1656
          existing_node.primary_ip == secondary_ip or
1657
          existing_node.secondary_ip == secondary_ip):
1658
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1659
                                   " existing node %s" % existing_node.name)
1660

    
1661
    # check that the type of the node (single versus dual homed) is the
1662
    # same as for the master
1663
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1664
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1665
    newbie_singlehomed = secondary_ip == primary_ip
1666
    if master_singlehomed != newbie_singlehomed:
1667
      if master_singlehomed:
1668
        raise errors.OpPrereqError("The master has no private ip but the"
1669
                                   " new node has one")
1670
      else:
1671
        raise errors.OpPrereqError("The master has a private ip but the"
1672
                                   " new node doesn't have one")
1673

    
1674
    # checks reachablity
1675
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1676
      raise errors.OpPrereqError("Node not reachable by ping")
1677

    
1678
    if not newbie_singlehomed:
1679
      # check reachability from my secondary ip to newbie's secondary ip
1680
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1681
                           source=myself.secondary_ip):
1682
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1683
                                   " based ping to noded port")
1684

    
1685
    self.new_node = objects.Node(name=node,
1686
                                 primary_ip=primary_ip,
1687
                                 secondary_ip=secondary_ip)
1688

    
1689
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1690
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1691
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1692
                                   constants.VNC_PASSWORD_FILE)
1693

    
1694
  def Exec(self, feedback_fn):
1695
    """Adds the new node to the cluster.
1696

1697
    """
1698
    new_node = self.new_node
1699
    node = new_node.name
1700

    
1701
    # set up inter-node password and certificate and restarts the node daemon
1702
    gntpass = self.sstore.GetNodeDaemonPassword()
1703
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1704
      raise errors.OpExecError("ganeti password corruption detected")
1705
    f = open(constants.SSL_CERT_FILE)
1706
    try:
1707
      gntpem = f.read(8192)
1708
    finally:
1709
      f.close()
1710
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1711
    # so we use this to detect an invalid certificate; as long as the
1712
    # cert doesn't contain this, the here-document will be correctly
1713
    # parsed by the shell sequence below
1714
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1715
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1716
    if not gntpem.endswith("\n"):
1717
      raise errors.OpExecError("PEM must end with newline")
1718
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1719

    
1720
    # and then connect with ssh to set password and start ganeti-noded
1721
    # note that all the below variables are sanitized at this point,
1722
    # either by being constants or by the checks above
1723
    ss = self.sstore
1724
    mycommand = ("umask 077 && "
1725
                 "echo '%s' > '%s' && "
1726
                 "cat > '%s' << '!EOF.' && \n"
1727
                 "%s!EOF.\n%s restart" %
1728
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1729
                  constants.SSL_CERT_FILE, gntpem,
1730
                  constants.NODE_INITD_SCRIPT))
1731

    
1732
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1733
    if result.failed:
1734
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1735
                               " output: %s" %
1736
                               (node, result.fail_reason, result.output))
1737

    
1738
    # check connectivity
1739
    time.sleep(4)
1740

    
1741
    result = rpc.call_version([node])[node]
1742
    if result:
1743
      if constants.PROTOCOL_VERSION == result:
1744
        logger.Info("communication to node %s fine, sw version %s match" %
1745
                    (node, result))
1746
      else:
1747
        raise errors.OpExecError("Version mismatch master version %s,"
1748
                                 " node version %s" %
1749
                                 (constants.PROTOCOL_VERSION, result))
1750
    else:
1751
      raise errors.OpExecError("Cannot get version from the new node")
1752

    
1753
    # setup ssh on node
1754
    logger.Info("copy ssh key to node %s" % node)
1755
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1756
    keyarray = []
1757
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1758
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1759
                priv_key, pub_key]
1760

    
1761
    for i in keyfiles:
1762
      f = open(i, 'r')
1763
      try:
1764
        keyarray.append(f.read())
1765
      finally:
1766
        f.close()
1767

    
1768
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1769
                               keyarray[3], keyarray[4], keyarray[5])
1770

    
1771
    if not result:
1772
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1773

    
1774
    # Add node to our /etc/hosts, and add key to known_hosts
1775
    _AddHostToEtcHosts(new_node.name)
1776

    
1777
    if new_node.secondary_ip != new_node.primary_ip:
1778
      if not rpc.call_node_tcp_ping(new_node.name,
1779
                                    constants.LOCALHOST_IP_ADDRESS,
1780
                                    new_node.secondary_ip,
1781
                                    constants.DEFAULT_NODED_PORT,
1782
                                    10, False):
1783
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1784
                                 " you gave (%s). Please fix and re-run this"
1785
                                 " command." % new_node.secondary_ip)
1786

    
1787
    success, msg = self.ssh.VerifyNodeHostname(node)
1788
    if not success:
1789
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1790
                               " than the one the resolver gives: %s."
1791
                               " Please fix and re-run this command." %
1792
                               (node, msg))
1793

    
1794
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1795
    # including the node just added
1796
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1797
    dist_nodes = self.cfg.GetNodeList() + [node]
1798
    if myself.name in dist_nodes:
1799
      dist_nodes.remove(myself.name)
1800

    
1801
    logger.Debug("Copying hosts and known_hosts to all nodes")
1802
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1803
      result = rpc.call_upload_file(dist_nodes, fname)
1804
      for to_node in dist_nodes:
1805
        if not result[to_node]:
1806
          logger.Error("copy of file %s to node %s failed" %
1807
                       (fname, to_node))
1808

    
1809
    to_copy = ss.GetFileList()
1810
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1811
      to_copy.append(constants.VNC_PASSWORD_FILE)
1812
    for fname in to_copy:
1813
      if not self.ssh.CopyFileToNode(node, fname):
1814
        logger.Error("could not copy file %s to node %s" % (fname, node))
1815

    
1816
    if not self.op.readd:
1817
      logger.Info("adding node %s to cluster.conf" % node)
1818
      self.cfg.AddNode(new_node)
1819

    
1820

    
1821
class LUMasterFailover(LogicalUnit):
1822
  """Failover the master node to the current node.
1823

1824
  This is a special LU in that it must run on a non-master node.
1825

1826
  """
1827
  HPATH = "master-failover"
1828
  HTYPE = constants.HTYPE_CLUSTER
1829
  REQ_MASTER = False
1830
  _OP_REQP = []
1831

    
1832
  def BuildHooksEnv(self):
1833
    """Build hooks env.
1834

1835
    This will run on the new master only in the pre phase, and on all
1836
    the nodes in the post phase.
1837

1838
    """
1839
    env = {
1840
      "OP_TARGET": self.new_master,
1841
      "NEW_MASTER": self.new_master,
1842
      "OLD_MASTER": self.old_master,
1843
      }
1844
    return env, [self.new_master], self.cfg.GetNodeList()
1845

    
1846
  def CheckPrereq(self):
1847
    """Check prerequisites.
1848

1849
    This checks that we are not already the master.
1850

1851
    """
1852
    self.new_master = utils.HostInfo().name
1853
    self.old_master = self.sstore.GetMasterNode()
1854

    
1855
    if self.old_master == self.new_master:
1856
      raise errors.OpPrereqError("This commands must be run on the node"
1857
                                 " where you want the new master to be."
1858
                                 " %s is already the master" %
1859
                                 self.old_master)
1860

    
1861
  def Exec(self, feedback_fn):
1862
    """Failover the master node.
1863

1864
    This command, when run on a non-master node, will cause the current
1865
    master to cease being master, and the non-master to become new
1866
    master.
1867

1868
    """
1869
    #TODO: do not rely on gethostname returning the FQDN
1870
    logger.Info("setting master to %s, old master: %s" %
1871
                (self.new_master, self.old_master))
1872

    
1873
    if not rpc.call_node_stop_master(self.old_master):
1874
      logger.Error("could disable the master role on the old master"
1875
                   " %s, please disable manually" % self.old_master)
1876

    
1877
    ss = self.sstore
1878
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1879
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1880
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1881
      logger.Error("could not distribute the new simple store master file"
1882
                   " to the other nodes, please check.")
1883

    
1884
    if not rpc.call_node_start_master(self.new_master):
1885
      logger.Error("could not start the master role on the new master"
1886
                   " %s, please check" % self.new_master)
1887
      feedback_fn("Error in activating the master IP on the new master,"
1888
                  " please fix manually.")
1889

    
1890

    
1891

    
1892
class LUQueryClusterInfo(NoHooksLU):
1893
  """Query cluster configuration.
1894

1895
  """
1896
  _OP_REQP = []
1897
  REQ_MASTER = False
1898

    
1899
  def CheckPrereq(self):
1900
    """No prerequsites needed for this LU.
1901

1902
    """
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Return cluster config.
1907

1908
    """
1909
    result = {
1910
      "name": self.sstore.GetClusterName(),
1911
      "software_version": constants.RELEASE_VERSION,
1912
      "protocol_version": constants.PROTOCOL_VERSION,
1913
      "config_version": constants.CONFIG_VERSION,
1914
      "os_api_version": constants.OS_API_VERSION,
1915
      "export_version": constants.EXPORT_VERSION,
1916
      "master": self.sstore.GetMasterNode(),
1917
      "architecture": (platform.architecture()[0], platform.machine()),
1918
      }
1919

    
1920
    return result
1921

    
1922

    
1923
class LUClusterCopyFile(NoHooksLU):
1924
  """Copy file to cluster.
1925

1926
  """
1927
  _OP_REQP = ["nodes", "filename"]
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    It should check that the named file exists and that the given list
1933
    of nodes is valid.
1934

1935
    """
1936
    if not os.path.exists(self.op.filename):
1937
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1938

    
1939
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1940

    
1941
  def Exec(self, feedback_fn):
1942
    """Copy a file from master to some nodes.
1943

1944
    Args:
1945
      opts - class with options as members
1946
      args - list containing a single element, the file name
1947
    Opts used:
1948
      nodes - list containing the name of target nodes; if empty, all nodes
1949

1950
    """
1951
    filename = self.op.filename
1952

    
1953
    myname = utils.HostInfo().name
1954

    
1955
    for node in self.nodes:
1956
      if node == myname:
1957
        continue
1958
      if not self.ssh.CopyFileToNode(node, filename):
1959
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1960

    
1961

    
1962
class LUDumpClusterConfig(NoHooksLU):
1963
  """Return a text-representation of the cluster-config.
1964

1965
  """
1966
  _OP_REQP = []
1967

    
1968
  def CheckPrereq(self):
1969
    """No prerequisites.
1970

1971
    """
1972
    pass
1973

    
1974
  def Exec(self, feedback_fn):
1975
    """Dump a representation of the cluster config to the standard output.
1976

1977
    """
1978
    return self.cfg.DumpConfig()
1979

    
1980

    
1981
class LURunClusterCommand(NoHooksLU):
1982
  """Run a command on some nodes.
1983

1984
  """
1985
  _OP_REQP = ["command", "nodes"]
1986

    
1987
  def CheckPrereq(self):
1988
    """Check prerequisites.
1989

1990
    It checks that the given list of nodes is valid.
1991

1992
    """
1993
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Run a command on some nodes.
1997

1998
    """
1999
    # put the master at the end of the nodes list
2000
    master_node = self.sstore.GetMasterNode()
2001
    if master_node in self.nodes:
2002
      self.nodes.remove(master_node)
2003
      self.nodes.append(master_node)
2004

    
2005
    data = []
2006
    for node in self.nodes:
2007
      result = self.ssh.Run(node, "root", self.op.command)
2008
      data.append((node, result.output, result.exit_code))
2009

    
2010
    return data
2011

    
2012

    
2013
class LUActivateInstanceDisks(NoHooksLU):
2014
  """Bring up an instance's disks.
2015

2016
  """
2017
  _OP_REQP = ["instance_name"]
2018

    
2019
  def CheckPrereq(self):
2020
    """Check prerequisites.
2021

2022
    This checks that the instance is in the cluster.
2023

2024
    """
2025
    instance = self.cfg.GetInstanceInfo(
2026
      self.cfg.ExpandInstanceName(self.op.instance_name))
2027
    if instance is None:
2028
      raise errors.OpPrereqError("Instance '%s' not known" %
2029
                                 self.op.instance_name)
2030
    self.instance = instance
2031

    
2032

    
2033
  def Exec(self, feedback_fn):
2034
    """Activate the disks.
2035

2036
    """
2037
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2038
    if not disks_ok:
2039
      raise errors.OpExecError("Cannot activate block devices")
2040

    
2041
    return disks_info
2042

    
2043

    
2044
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2045
  """Prepare the block devices for an instance.
2046

2047
  This sets up the block devices on all nodes.
2048

2049
  Args:
2050
    instance: a ganeti.objects.Instance object
2051
    ignore_secondaries: if true, errors on secondary nodes won't result
2052
                        in an error return from the function
2053

2054
  Returns:
2055
    false if the operation failed
2056
    list of (host, instance_visible_name, node_visible_name) if the operation
2057
         suceeded with the mapping from node devices to instance devices
2058
  """
2059
  device_info = []
2060
  disks_ok = True
2061
  iname = instance.name
2062
  # With the two passes mechanism we try to reduce the window of
2063
  # opportunity for the race condition of switching DRBD to primary
2064
  # before handshaking occured, but we do not eliminate it
2065

    
2066
  # The proper fix would be to wait (with some limits) until the
2067
  # connection has been made and drbd transitions from WFConnection
2068
  # into any other network-connected state (Connected, SyncTarget,
2069
  # SyncSource, etc.)
2070

    
2071
  # 1st pass, assemble on all nodes in secondary mode
2072
  for inst_disk in instance.disks:
2073
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2074
      cfg.SetDiskID(node_disk, node)
2075
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2076
      if not result:
2077
        logger.Error("could not prepare block device %s on node %s"
2078
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2079
        if not ignore_secondaries:
2080
          disks_ok = False
2081

    
2082
  # FIXME: race condition on drbd migration to primary
2083

    
2084
  # 2nd pass, do only the primary node
2085
  for inst_disk in instance.disks:
2086
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2087
      if node != instance.primary_node:
2088
        continue
2089
      cfg.SetDiskID(node_disk, node)
2090
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2091
      if not result:
2092
        logger.Error("could not prepare block device %s on node %s"
2093
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2094
        disks_ok = False
2095
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2096

    
2097
  # leave the disks configured for the primary node
2098
  # this is a workaround that would be fixed better by
2099
  # improving the logical/physical id handling
2100
  for disk in instance.disks:
2101
    cfg.SetDiskID(disk, instance.primary_node)
2102

    
2103
  return disks_ok, device_info
2104

    
2105

    
2106
def _StartInstanceDisks(cfg, instance, force):
2107
  """Start the disks of an instance.
2108

2109
  """
2110
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2111
                                           ignore_secondaries=force)
2112
  if not disks_ok:
2113
    _ShutdownInstanceDisks(instance, cfg)
2114
    if force is not None and not force:
2115
      logger.Error("If the message above refers to a secondary node,"
2116
                   " you can retry the operation using '--force'.")
2117
    raise errors.OpExecError("Disk consistency error")
2118

    
2119

    
2120
class LUDeactivateInstanceDisks(NoHooksLU):
2121
  """Shutdown an instance's disks.
2122

2123
  """
2124
  _OP_REQP = ["instance_name"]
2125

    
2126
  def CheckPrereq(self):
2127
    """Check prerequisites.
2128

2129
    This checks that the instance is in the cluster.
2130

2131
    """
2132
    instance = self.cfg.GetInstanceInfo(
2133
      self.cfg.ExpandInstanceName(self.op.instance_name))
2134
    if instance is None:
2135
      raise errors.OpPrereqError("Instance '%s' not known" %
2136
                                 self.op.instance_name)
2137
    self.instance = instance
2138

    
2139
  def Exec(self, feedback_fn):
2140
    """Deactivate the disks
2141

2142
    """
2143
    instance = self.instance
2144
    ins_l = rpc.call_instance_list([instance.primary_node])
2145
    ins_l = ins_l[instance.primary_node]
2146
    if not type(ins_l) is list:
2147
      raise errors.OpExecError("Can't contact node '%s'" %
2148
                               instance.primary_node)
2149

    
2150
    if self.instance.name in ins_l:
2151
      raise errors.OpExecError("Instance is running, can't shutdown"
2152
                               " block devices.")
2153

    
2154
    _ShutdownInstanceDisks(instance, self.cfg)
2155

    
2156

    
2157
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2158
  """Shutdown block devices of an instance.
2159

2160
  This does the shutdown on all nodes of the instance.
2161

2162
  If the ignore_primary is false, errors on the primary node are
2163
  ignored.
2164

2165
  """
2166
  result = True
2167
  for disk in instance.disks:
2168
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2169
      cfg.SetDiskID(top_disk, node)
2170
      if not rpc.call_blockdev_shutdown(node, top_disk):
2171
        logger.Error("could not shutdown block device %s on node %s" %
2172
                     (disk.iv_name, node))
2173
        if not ignore_primary or node != instance.primary_node:
2174
          result = False
2175
  return result
2176

    
2177

    
2178
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2179
  """Checks if a node has enough free memory.
2180

2181
  This function check if a given node has the needed amount of free
2182
  memory. In case the node has less memory or we cannot get the
2183
  information from the node, this function raise an OpPrereqError
2184
  exception.
2185

2186
  Args:
2187
    - cfg: a ConfigWriter instance
2188
    - node: the node name
2189
    - reason: string to use in the error message
2190
    - requested: the amount of memory in MiB
2191

2192
  """
2193
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2194
  if not nodeinfo or not isinstance(nodeinfo, dict):
2195
    raise errors.OpPrereqError("Could not contact node %s for resource"
2196
                             " information" % (node,))
2197

    
2198
  free_mem = nodeinfo[node].get('memory_free')
2199
  if not isinstance(free_mem, int):
2200
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2201
                             " was '%s'" % (node, free_mem))
2202
  if requested > free_mem:
2203
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2204
                             " needed %s MiB, available %s MiB" %
2205
                             (node, reason, requested, free_mem))
2206

    
2207

    
2208
class LUStartupInstance(LogicalUnit):
2209
  """Starts an instance.
2210

2211
  """
2212
  HPATH = "instance-start"
2213
  HTYPE = constants.HTYPE_INSTANCE
2214
  _OP_REQP = ["instance_name", "force"]
2215

    
2216
  def BuildHooksEnv(self):
2217
    """Build hooks env.
2218

2219
    This runs on master, primary and secondary nodes of the instance.
2220

2221
    """
2222
    env = {
2223
      "FORCE": self.op.force,
2224
      }
2225
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2226
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227
          list(self.instance.secondary_nodes))
2228
    return env, nl, nl
2229

    
2230
  def CheckPrereq(self):
2231
    """Check prerequisites.
2232

2233
    This checks that the instance is in the cluster.
2234

2235
    """
2236
    instance = self.cfg.GetInstanceInfo(
2237
      self.cfg.ExpandInstanceName(self.op.instance_name))
2238
    if instance is None:
2239
      raise errors.OpPrereqError("Instance '%s' not known" %
2240
                                 self.op.instance_name)
2241

    
2242
    # check bridges existance
2243
    _CheckInstanceBridgesExist(instance)
2244

    
2245
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2246
                         "starting instance %s" % instance.name,
2247
                         instance.memory)
2248

    
2249
    self.instance = instance
2250
    self.op.instance_name = instance.name
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Start the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    force = self.op.force
2258
    extra_args = getattr(self.op, "extra_args", "")
2259

    
2260
    self.cfg.MarkInstanceUp(instance.name)
2261

    
2262
    node_current = instance.primary_node
2263

    
2264
    _StartInstanceDisks(self.cfg, instance, force)
2265

    
2266
    if not rpc.call_instance_start(node_current, instance, extra_args):
2267
      _ShutdownInstanceDisks(instance, self.cfg)
2268
      raise errors.OpExecError("Could not start instance")
2269

    
2270

    
2271
class LURebootInstance(LogicalUnit):
2272
  """Reboot an instance.
2273

2274
  """
2275
  HPATH = "instance-reboot"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = {
2286
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2287
      }
2288
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304

    
2305
    # check bridges existance
2306
    _CheckInstanceBridgesExist(instance)
2307

    
2308
    self.instance = instance
2309
    self.op.instance_name = instance.name
2310

    
2311
  def Exec(self, feedback_fn):
2312
    """Reboot the instance.
2313

2314
    """
2315
    instance = self.instance
2316
    ignore_secondaries = self.op.ignore_secondaries
2317
    reboot_type = self.op.reboot_type
2318
    extra_args = getattr(self.op, "extra_args", "")
2319

    
2320
    node_current = instance.primary_node
2321

    
2322
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2323
                           constants.INSTANCE_REBOOT_HARD,
2324
                           constants.INSTANCE_REBOOT_FULL]:
2325
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2326
                                  (constants.INSTANCE_REBOOT_SOFT,
2327
                                   constants.INSTANCE_REBOOT_HARD,
2328
                                   constants.INSTANCE_REBOOT_FULL))
2329

    
2330
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2331
                       constants.INSTANCE_REBOOT_HARD]:
2332
      if not rpc.call_instance_reboot(node_current, instance,
2333
                                      reboot_type, extra_args):
2334
        raise errors.OpExecError("Could not reboot instance")
2335
    else:
2336
      if not rpc.call_instance_shutdown(node_current, instance):
2337
        raise errors.OpExecError("could not shutdown instance for full reboot")
2338
      _ShutdownInstanceDisks(instance, self.cfg)
2339
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2340
      if not rpc.call_instance_start(node_current, instance, extra_args):
2341
        _ShutdownInstanceDisks(instance, self.cfg)
2342
        raise errors.OpExecError("Could not start instance for full reboot")
2343

    
2344
    self.cfg.MarkInstanceUp(instance.name)
2345

    
2346

    
2347
class LUShutdownInstance(LogicalUnit):
2348
  """Shutdown an instance.
2349

2350
  """
2351
  HPATH = "instance-stop"
2352
  HTYPE = constants.HTYPE_INSTANCE
2353
  _OP_REQP = ["instance_name"]
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on master, primary and secondary nodes of the instance.
2359

2360
    """
2361
    env = _BuildInstanceHookEnvByObject(self.instance)
2362
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2363
          list(self.instance.secondary_nodes))
2364
    return env, nl, nl
2365

    
2366
  def CheckPrereq(self):
2367
    """Check prerequisites.
2368

2369
    This checks that the instance is in the cluster.
2370

2371
    """
2372
    instance = self.cfg.GetInstanceInfo(
2373
      self.cfg.ExpandInstanceName(self.op.instance_name))
2374
    if instance is None:
2375
      raise errors.OpPrereqError("Instance '%s' not known" %
2376
                                 self.op.instance_name)
2377
    self.instance = instance
2378

    
2379
  def Exec(self, feedback_fn):
2380
    """Shutdown the instance.
2381

2382
    """
2383
    instance = self.instance
2384
    node_current = instance.primary_node
2385
    self.cfg.MarkInstanceDown(instance.name)
2386
    if not rpc.call_instance_shutdown(node_current, instance):
2387
      logger.Error("could not shutdown instance")
2388

    
2389
    _ShutdownInstanceDisks(instance, self.cfg)
2390

    
2391

    
2392
class LUReinstallInstance(LogicalUnit):
2393
  """Reinstall an instance.
2394

2395
  """
2396
  HPATH = "instance-reinstall"
2397
  HTYPE = constants.HTYPE_INSTANCE
2398
  _OP_REQP = ["instance_name"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = _BuildInstanceHookEnvByObject(self.instance)
2407
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2408
          list(self.instance.secondary_nodes))
2409
    return env, nl, nl
2410

    
2411
  def CheckPrereq(self):
2412
    """Check prerequisites.
2413

2414
    This checks that the instance is in the cluster and is not running.
2415

2416
    """
2417
    instance = self.cfg.GetInstanceInfo(
2418
      self.cfg.ExpandInstanceName(self.op.instance_name))
2419
    if instance is None:
2420
      raise errors.OpPrereqError("Instance '%s' not known" %
2421
                                 self.op.instance_name)
2422
    if instance.disk_template == constants.DT_DISKLESS:
2423
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2424
                                 self.op.instance_name)
2425
    if instance.status != "down":
2426
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2427
                                 self.op.instance_name)
2428
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2429
    if remote_info:
2430
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2431
                                 (self.op.instance_name,
2432
                                  instance.primary_node))
2433

    
2434
    self.op.os_type = getattr(self.op, "os_type", None)
2435
    if self.op.os_type is not None:
2436
      # OS verification
2437
      pnode = self.cfg.GetNodeInfo(
2438
        self.cfg.ExpandNodeName(instance.primary_node))
2439
      if pnode is None:
2440
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2441
                                   self.op.pnode)
2442
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2443
      if not os_obj:
2444
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2445
                                   " primary node"  % self.op.os_type)
2446

    
2447
    self.instance = instance
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Reinstall the instance.
2451

2452
    """
2453
    inst = self.instance
2454

    
2455
    if self.op.os_type is not None:
2456
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2457
      inst.os = self.op.os_type
2458
      self.cfg.AddInstance(inst)
2459

    
2460
    _StartInstanceDisks(self.cfg, inst, None)
2461
    try:
2462
      feedback_fn("Running the instance OS create scripts...")
2463
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2464
        raise errors.OpExecError("Could not install OS for instance %s"
2465
                                 " on node %s" %
2466
                                 (inst.name, inst.primary_node))
2467
    finally:
2468
      _ShutdownInstanceDisks(inst, self.cfg)
2469

    
2470

    
2471
class LURenameInstance(LogicalUnit):
2472
  """Rename an instance.
2473

2474
  """
2475
  HPATH = "instance-rename"
2476
  HTYPE = constants.HTYPE_INSTANCE
2477
  _OP_REQP = ["instance_name", "new_name"]
2478

    
2479
  def BuildHooksEnv(self):
2480
    """Build hooks env.
2481

2482
    This runs on master, primary and secondary nodes of the instance.
2483

2484
    """
2485
    env = _BuildInstanceHookEnvByObject(self.instance)
2486
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2487
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2488
          list(self.instance.secondary_nodes))
2489
    return env, nl, nl
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster and is not running.
2495

2496
    """
2497
    instance = self.cfg.GetInstanceInfo(
2498
      self.cfg.ExpandInstanceName(self.op.instance_name))
2499
    if instance is None:
2500
      raise errors.OpPrereqError("Instance '%s' not known" %
2501
                                 self.op.instance_name)
2502
    if instance.status != "down":
2503
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2504
                                 self.op.instance_name)
2505
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2506
    if remote_info:
2507
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2508
                                 (self.op.instance_name,
2509
                                  instance.primary_node))
2510
    self.instance = instance
2511

    
2512
    # new name verification
2513
    name_info = utils.HostInfo(self.op.new_name)
2514

    
2515
    self.op.new_name = new_name = name_info.name
2516
    instance_list = self.cfg.GetInstanceList()
2517
    if new_name in instance_list:
2518
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2519
                                 new_name)
2520

    
2521
    if not getattr(self.op, "ignore_ip", False):
2522
      command = ["fping", "-q", name_info.ip]
2523
      result = utils.RunCmd(command)
2524
      if not result.failed:
2525
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2526
                                   (name_info.ip, new_name))
2527

    
2528

    
2529
  def Exec(self, feedback_fn):
2530
    """Reinstall the instance.
2531

2532
    """
2533
    inst = self.instance
2534
    old_name = inst.name
2535

    
2536
    if inst.disk_template == constants.DT_FILE:
2537
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2538

    
2539
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2540

    
2541
    # re-read the instance from the configuration after rename
2542
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2543

    
2544
    if inst.disk_template == constants.DT_FILE:
2545
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2546
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2547
                                                old_file_storage_dir,
2548
                                                new_file_storage_dir)
2549

    
2550
      if not result:
2551
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2552
                                 " directory '%s' to '%s' (but the instance"
2553
                                 " has been renamed in Ganeti)" % (
2554
                                 inst.primary_node, old_file_storage_dir,
2555
                                 new_file_storage_dir))
2556

    
2557
      if not result[0]:
2558
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2559
                                 " (but the instance has been renamed in"
2560
                                 " Ganeti)" % (old_file_storage_dir,
2561
                                               new_file_storage_dir))
2562

    
2563
    _StartInstanceDisks(self.cfg, inst, None)
2564
    try:
2565
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2566
                                          "sda", "sdb"):
2567
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2568
               " instance has been renamed in Ganeti)" %
2569
               (inst.name, inst.primary_node))
2570
        logger.Error(msg)
2571
    finally:
2572
      _ShutdownInstanceDisks(inst, self.cfg)
2573

    
2574

    
2575
class LURemoveInstance(LogicalUnit):
2576
  """Remove an instance.
2577

2578
  """
2579
  HPATH = "instance-remove"
2580
  HTYPE = constants.HTYPE_INSTANCE
2581
  _OP_REQP = ["instance_name"]
2582

    
2583
  def BuildHooksEnv(self):
2584
    """Build hooks env.
2585

2586
    This runs on master, primary and secondary nodes of the instance.
2587

2588
    """
2589
    env = _BuildInstanceHookEnvByObject(self.instance)
2590
    nl = [self.sstore.GetMasterNode()]
2591
    return env, nl, nl
2592

    
2593
  def CheckPrereq(self):
2594
    """Check prerequisites.
2595

2596
    This checks that the instance is in the cluster.
2597

2598
    """
2599
    instance = self.cfg.GetInstanceInfo(
2600
      self.cfg.ExpandInstanceName(self.op.instance_name))
2601
    if instance is None:
2602
      raise errors.OpPrereqError("Instance '%s' not known" %
2603
                                 self.op.instance_name)
2604
    self.instance = instance
2605

    
2606
  def Exec(self, feedback_fn):
2607
    """Remove the instance.
2608

2609
    """
2610
    instance = self.instance
2611
    logger.Info("shutting down instance %s on node %s" %
2612
                (instance.name, instance.primary_node))
2613

    
2614
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2615
      if self.op.ignore_failures:
2616
        feedback_fn("Warning: can't shutdown instance")
2617
      else:
2618
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2619
                                 (instance.name, instance.primary_node))
2620

    
2621
    logger.Info("removing block devices for instance %s" % instance.name)
2622

    
2623
    if not _RemoveDisks(instance, self.cfg):
2624
      if self.op.ignore_failures:
2625
        feedback_fn("Warning: can't remove instance's disks")
2626
      else:
2627
        raise errors.OpExecError("Can't remove instance's disks")
2628

    
2629
    logger.Info("removing instance %s out of cluster config" % instance.name)
2630

    
2631
    self.cfg.RemoveInstance(instance.name)
2632

    
2633

    
2634
class LUQueryInstances(NoHooksLU):
2635
  """Logical unit for querying instances.
2636

2637
  """
2638
  _OP_REQP = ["output_fields", "names"]
2639

    
2640
  def CheckPrereq(self):
2641
    """Check prerequisites.
2642

2643
    This checks that the fields required are valid output fields.
2644

2645
    """
2646
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2647
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2648
                               "admin_state", "admin_ram",
2649
                               "disk_template", "ip", "mac", "bridge",
2650
                               "sda_size", "sdb_size", "vcpus"],
2651
                       dynamic=self.dynamic_fields,
2652
                       selected=self.op.output_fields)
2653

    
2654
    self.wanted = _GetWantedInstances(self, self.op.names)
2655

    
2656
  def Exec(self, feedback_fn):
2657
    """Computes the list of nodes and their attributes.
2658

2659
    """
2660
    instance_names = self.wanted
2661
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2662
                     in instance_names]
2663

    
2664
    # begin data gathering
2665

    
2666
    nodes = frozenset([inst.primary_node for inst in instance_list])
2667

    
2668
    bad_nodes = []
2669
    if self.dynamic_fields.intersection(self.op.output_fields):
2670
      live_data = {}
2671
      node_data = rpc.call_all_instances_info(nodes)
2672
      for name in nodes:
2673
        result = node_data[name]
2674
        if result:
2675
          live_data.update(result)
2676
        elif result == False:
2677
          bad_nodes.append(name)
2678
        # else no instance is alive
2679
    else:
2680
      live_data = dict([(name, {}) for name in instance_names])
2681

    
2682
    # end data gathering
2683

    
2684
    output = []
2685
    for instance in instance_list:
2686
      iout = []
2687
      for field in self.op.output_fields:
2688
        if field == "name":
2689
          val = instance.name
2690
        elif field == "os":
2691
          val = instance.os
2692
        elif field == "pnode":
2693
          val = instance.primary_node
2694
        elif field == "snodes":
2695
          val = list(instance.secondary_nodes)
2696
        elif field == "admin_state":
2697
          val = (instance.status != "down")
2698
        elif field == "oper_state":
2699
          if instance.primary_node in bad_nodes:
2700
            val = None
2701
          else:
2702
            val = bool(live_data.get(instance.name))
2703
        elif field == "status":
2704
          if instance.primary_node in bad_nodes:
2705
            val = "ERROR_nodedown"
2706
          else:
2707
            running = bool(live_data.get(instance.name))
2708
            if running:
2709
              if instance.status != "down":
2710
                val = "running"
2711
              else:
2712
                val = "ERROR_up"
2713
            else:
2714
              if instance.status != "down":
2715
                val = "ERROR_down"
2716
              else:
2717
                val = "ADMIN_down"
2718
        elif field == "admin_ram":
2719
          val = instance.memory
2720
        elif field == "oper_ram":
2721
          if instance.primary_node in bad_nodes:
2722
            val = None
2723
          elif instance.name in live_data:
2724
            val = live_data[instance.name].get("memory", "?")
2725
          else:
2726
            val = "-"
2727
        elif field == "disk_template":
2728
          val = instance.disk_template
2729
        elif field == "ip":
2730
          val = instance.nics[0].ip
2731
        elif field == "bridge":
2732
          val = instance.nics[0].bridge
2733
        elif field == "mac":
2734
          val = instance.nics[0].mac
2735
        elif field == "sda_size" or field == "sdb_size":
2736
          disk = instance.FindDisk(field[:3])
2737
          if disk is None:
2738
            val = None
2739
          else:
2740
            val = disk.size
2741
        elif field == "vcpus":
2742
          val = instance.vcpus
2743
        else:
2744
          raise errors.ParameterError(field)
2745
        iout.append(val)
2746
      output.append(iout)
2747

    
2748
    return output
2749

    
2750

    
2751
class LUFailoverInstance(LogicalUnit):
2752
  """Failover an instance.
2753

2754
  """
2755
  HPATH = "instance-failover"
2756
  HTYPE = constants.HTYPE_INSTANCE
2757
  _OP_REQP = ["instance_name", "ignore_consistency"]
2758

    
2759
  def BuildHooksEnv(self):
2760
    """Build hooks env.
2761

2762
    This runs on master, primary and secondary nodes of the instance.
2763

2764
    """
2765
    env = {
2766
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2767
      }
2768
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2769
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2770
    return env, nl, nl
2771

    
2772
  def CheckPrereq(self):
2773
    """Check prerequisites.
2774

2775
    This checks that the instance is in the cluster.
2776

2777
    """
2778
    instance = self.cfg.GetInstanceInfo(
2779
      self.cfg.ExpandInstanceName(self.op.instance_name))
2780
    if instance is None:
2781
      raise errors.OpPrereqError("Instance '%s' not known" %
2782
                                 self.op.instance_name)
2783

    
2784
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2785
      raise errors.OpPrereqError("Instance's disk layout is not"
2786
                                 " network mirrored, cannot failover.")
2787

    
2788
    secondary_nodes = instance.secondary_nodes
2789
    if not secondary_nodes:
2790
      raise errors.ProgrammerError("no secondary node but using "
2791
                                   "DT_REMOTE_RAID1 template")
2792

    
2793
    target_node = secondary_nodes[0]
2794
    # check memory requirements on the secondary node
2795
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2796
                         instance.name, instance.memory)
2797

    
2798
    # check bridge existance
2799
    brlist = [nic.bridge for nic in instance.nics]
2800
    if not rpc.call_bridges_exist(target_node, brlist):
2801
      raise errors.OpPrereqError("One or more target bridges %s does not"
2802
                                 " exist on destination node '%s'" %
2803
                                 (brlist, target_node))
2804

    
2805
    self.instance = instance
2806

    
2807
  def Exec(self, feedback_fn):
2808
    """Failover an instance.
2809

2810
    The failover is done by shutting it down on its present node and
2811
    starting it on the secondary.
2812

2813
    """
2814
    instance = self.instance
2815

    
2816
    source_node = instance.primary_node
2817
    target_node = instance.secondary_nodes[0]
2818

    
2819
    feedback_fn("* checking disk consistency between source and target")
2820
    for dev in instance.disks:
2821
      # for remote_raid1, these are md over drbd
2822
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2823
        if instance.status == "up" and not self.op.ignore_consistency:
2824
          raise errors.OpExecError("Disk %s is degraded on target node,"
2825
                                   " aborting failover." % dev.iv_name)
2826

    
2827
    feedback_fn("* shutting down instance on source node")
2828
    logger.Info("Shutting down instance %s on node %s" %
2829
                (instance.name, source_node))
2830

    
2831
    if not rpc.call_instance_shutdown(source_node, instance):
2832
      if self.op.ignore_consistency:
2833
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2834
                     " anyway. Please make sure node %s is down"  %
2835
                     (instance.name, source_node, source_node))
2836
      else:
2837
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2838
                                 (instance.name, source_node))
2839

    
2840
    feedback_fn("* deactivating the instance's disks on source node")
2841
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2842
      raise errors.OpExecError("Can't shut down the instance's disks.")
2843

    
2844
    instance.primary_node = target_node
2845
    # distribute new instance config to the other nodes
2846
    self.cfg.AddInstance(instance)
2847

    
2848
    # Only start the instance if it's marked as up
2849
    if instance.status == "up":
2850
      feedback_fn("* activating the instance's disks on target node")
2851
      logger.Info("Starting instance %s on node %s" %
2852
                  (instance.name, target_node))
2853

    
2854
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2855
                                               ignore_secondaries=True)
2856
      if not disks_ok:
2857
        _ShutdownInstanceDisks(instance, self.cfg)
2858
        raise errors.OpExecError("Can't activate the instance's disks")
2859

    
2860
      feedback_fn("* starting the instance on the target node")
2861
      if not rpc.call_instance_start(target_node, instance, None):
2862
        _ShutdownInstanceDisks(instance, self.cfg)
2863
        raise errors.OpExecError("Could not start instance %s on node %s." %
2864
                                 (instance.name, target_node))
2865

    
2866

    
2867
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2868
  """Create a tree of block devices on the primary node.
2869

2870
  This always creates all devices.
2871

2872
  """
2873
  if device.children:
2874
    for child in device.children:
2875
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2876
        return False
2877

    
2878
  cfg.SetDiskID(device, node)
2879
  new_id = rpc.call_blockdev_create(node, device, device.size,
2880
                                    instance.name, True, info)
2881
  if not new_id:
2882
    return False
2883
  if device.physical_id is None:
2884
    device.physical_id = new_id
2885
  return True
2886

    
2887

    
2888
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2889
  """Create a tree of block devices on a secondary node.
2890

2891
  If this device type has to be created on secondaries, create it and
2892
  all its children.
2893

2894
  If not, just recurse to children keeping the same 'force' value.
2895

2896
  """
2897
  if device.CreateOnSecondary():
2898
    force = True
2899
  if device.children:
2900
    for child in device.children:
2901
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2902
                                        child, force, info):
2903
        return False
2904

    
2905
  if not force:
2906
    return True
2907
  cfg.SetDiskID(device, node)
2908
  new_id = rpc.call_blockdev_create(node, device, device.size,
2909
                                    instance.name, False, info)
2910
  if not new_id:
2911
    return False
2912
  if device.physical_id is None:
2913
    device.physical_id = new_id
2914
  return True
2915

    
2916

    
2917
def _GenerateUniqueNames(cfg, exts):
2918
  """Generate a suitable LV name.
2919

2920
  This will generate a logical volume name for the given instance.
2921

2922
  """
2923
  results = []
2924
  for val in exts:
2925
    new_id = cfg.GenerateUniqueID()
2926
    results.append("%s%s" % (new_id, val))
2927
  return results
2928

    
2929

    
2930
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2931
  """Generate a drbd device complete with its children.
2932

2933
  """
2934
  port = cfg.AllocatePort()
2935
  vgname = cfg.GetVGName()
2936
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2937
                          logical_id=(vgname, names[0]))
2938
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2939
                          logical_id=(vgname, names[1]))
2940
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2941
                          logical_id = (primary, secondary, port),
2942
                          children = [dev_data, dev_meta])
2943
  return drbd_dev
2944

    
2945

    
2946
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2947
  """Generate a drbd8 device complete with its children.
2948

2949
  """
2950
  port = cfg.AllocatePort()
2951
  vgname = cfg.GetVGName()
2952
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2953
                          logical_id=(vgname, names[0]))
2954
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2955
                          logical_id=(vgname, names[1]))
2956
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2957
                          logical_id = (primary, secondary, port),
2958
                          children = [dev_data, dev_meta],
2959
                          iv_name=iv_name)
2960
  return drbd_dev
2961

    
2962

    
2963
def _GenerateDiskTemplate(cfg, template_name,
2964
                          instance_name, primary_node,
2965
                          secondary_nodes, disk_sz, swap_sz,
2966
                          file_storage_dir, file_driver):
2967
  """Generate the entire disk layout for a given template type.
2968

2969
  """
2970
  #TODO: compute space requirements
2971

    
2972
  vgname = cfg.GetVGName()
2973
  if template_name == constants.DT_DISKLESS:
2974
    disks = []
2975
  elif template_name == constants.DT_PLAIN:
2976
    if len(secondary_nodes) != 0:
2977
      raise errors.ProgrammerError("Wrong template configuration")
2978

    
2979
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2980
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2981
                           logical_id=(vgname, names[0]),
2982
                           iv_name = "sda")
2983
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2984
                           logical_id=(vgname, names[1]),
2985
                           iv_name = "sdb")
2986
    disks = [sda_dev, sdb_dev]
2987
  elif template_name == constants.DT_DRBD8:
2988
    if len(secondary_nodes) != 1:
2989
      raise errors.ProgrammerError("Wrong template configuration")
2990
    remote_node = secondary_nodes[0]
2991
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2992
                                       ".sdb_data", ".sdb_meta"])
2993
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2994
                                         disk_sz, names[0:2], "sda")
2995
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2996
                                         swap_sz, names[2:4], "sdb")
2997
    disks = [drbd_sda_dev, drbd_sdb_dev]
2998
  elif template_name == constants.DT_FILE:
2999
    if len(secondary_nodes) != 0:
3000
      raise errors.ProgrammerError("Wrong template configuration")
3001

    
3002
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3003
                                iv_name="sda", logical_id=(file_driver,
3004
                                "%s/sda" % file_storage_dir))
3005
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3006
                                iv_name="sdb", logical_id=(file_driver,
3007
                                "%s/sdb" % file_storage_dir))
3008
    disks = [file_sda_dev, file_sdb_dev]
3009
  else:
3010
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3011
  return disks
3012

    
3013

    
3014
def _GetInstanceInfoText(instance):
3015
  """Compute that text that should be added to the disk's metadata.
3016

3017
  """
3018
  return "originstname+%s" % instance.name
3019

    
3020

    
3021
def _CreateDisks(cfg, instance):
3022
  """Create all disks for an instance.
3023

3024
  This abstracts away some work from AddInstance.
3025

3026
  Args:
3027
    instance: the instance object
3028

3029
  Returns:
3030
    True or False showing the success of the creation process
3031

3032
  """
3033
  info = _GetInstanceInfoText(instance)
3034

    
3035
  if instance.disk_template == constants.DT_FILE:
3036
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3037
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3038
                                              file_storage_dir)
3039

    
3040
    if not result:
3041
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3042
      return False
3043

    
3044
    if not result[0]:
3045
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3046
      return False
3047

    
3048
  for device in instance.disks:
3049
    logger.Info("creating volume %s for instance %s" %
3050
                (device.iv_name, instance.name))
3051
    #HARDCODE
3052
    for secondary_node in instance.secondary_nodes:
3053
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3054
                                        device, False, info):
3055
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3056
                     (device.iv_name, device, secondary_node))
3057
        return False
3058
    #HARDCODE
3059
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3060
                                    instance, device, info):
3061
      logger.Error("failed to create volume %s on primary!" %
3062
                   device.iv_name)
3063
      return False
3064

    
3065
  return True
3066

    
3067

    
3068
def _RemoveDisks(instance, cfg):
3069
  """Remove all disks for an instance.
3070

3071
  This abstracts away some work from `AddInstance()` and
3072
  `RemoveInstance()`. Note that in case some of the devices couldn't
3073
  be removed, the removal will continue with the other ones (compare
3074
  with `_CreateDisks()`).
3075

3076
  Args:
3077
    instance: the instance object
3078

3079
  Returns:
3080
    True or False showing the success of the removal proces
3081

3082
  """
3083
  logger.Info("removing block devices for instance %s" % instance.name)
3084

    
3085
  result = True
3086
  for device in instance.disks:
3087
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3088
      cfg.SetDiskID(disk, node)
3089
      if not rpc.call_blockdev_remove(node, disk):
3090
        logger.Error("could not remove block device %s on node %s,"
3091
                     " continuing anyway" %
3092
                     (device.iv_name, node))
3093
        result = False
3094

    
3095
  if instance.disk_template == constants.DT_FILE:
3096
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3097
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3098
                                            file_storage_dir):
3099
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3100
      result = False
3101

    
3102
  return result
3103

    
3104

    
3105
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3106
  """Compute disk size requirements in the volume group
3107

3108
  This is currently hard-coded for the two-drive layout.
3109

3110
  """
3111
  # Required free disk space as a function of disk and swap space
3112
  req_size_dict = {
3113
    constants.DT_DISKLESS: None,
3114
    constants.DT_PLAIN: disk_size + swap_size,
3115
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3116
    constants.DT_DRBD8: disk_size + swap_size + 256,
3117
    constants.DT_FILE: None,
3118
  }
3119

    
3120
  if disk_template not in req_size_dict:
3121
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3122
                                 " is unknown" %  disk_template)
3123

    
3124
  return req_size_dict[disk_template]
3125

    
3126

    
3127
class LUCreateInstance(LogicalUnit):
3128
  """Create an instance.
3129

3130
  """
3131
  HPATH = "instance-add"
3132
  HTYPE = constants.HTYPE_INSTANCE
3133
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3134
              "disk_template", "swap_size", "mode", "start", "vcpus",
3135
              "wait_for_sync", "ip_check", "mac"]
3136

    
3137
  def _RunAllocator(self):
3138
    """Run the allocator based on input opcode.
3139

3140
    """
3141
    disks = [{"size": self.op.disk_size, "mode": "w"},
3142
             {"size": self.op.swap_size, "mode": "w"}]
3143
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3144
             "bridge": self.op.bridge}]
3145
    ial = IAllocator(self.cfg, self.sstore,
3146
                     name=self.op.instance_name,
3147
                     disk_template=self.op.disk_template,
3148
                     tags=[],
3149
                     os=self.op.os_type,
3150
                     vcpus=self.op.vcpus,
3151
                     mem_size=self.op.mem_size,
3152
                     disks=disks,
3153
                     nics=nics,
3154
                     mode=constants.IALLOCATOR_MODE_ALLOC)
3155

    
3156
    ial.Run(self.op.iallocator)
3157

    
3158
    if not ial.success:
3159
      raise errors.OpPrereqError("Can't compute nodes using"
3160
                                 " iallocator '%s': %s" % (self.op.iallocator,
3161
                                                           ial.info))
3162
    req_nodes = 1
3163
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3164
      req_nodes += 1
3165

    
3166
    if len(ial.nodes) != req_nodes:
3167
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3168
                                 " of nodes (%s), required %s" %
3169
                                 (len(ial.nodes), req_nodes))
3170
    self.op.pnode = ial.nodes[0]
3171
    logger.ToStdout("Selected nodes for the instance: %s" %
3172
                    (", ".join(ial.nodes),))
3173
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3174
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3175
    if req_nodes == 2:
3176
      self.op.snode = ial.nodes[1]
3177

    
3178
  def BuildHooksEnv(self):
3179
    """Build hooks env.
3180

3181
    This runs on master, primary and secondary nodes of the instance.
3182

3183
    """
3184
    env = {
3185
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3186
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3187
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3188
      "INSTANCE_ADD_MODE": self.op.mode,
3189
      }
3190
    if self.op.mode == constants.INSTANCE_IMPORT:
3191
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3192
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3193
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3194

    
3195
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3196
      primary_node=self.op.pnode,
3197
      secondary_nodes=self.secondaries,
3198
      status=self.instance_status,
3199
      os_type=self.op.os_type,
3200
      memory=self.op.mem_size,
3201
      vcpus=self.op.vcpus,
3202
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3203
    ))
3204

    
3205
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3206
          self.secondaries)
3207
    return env, nl, nl
3208

    
3209

    
3210
  def CheckPrereq(self):
3211
    """Check prerequisites.
3212

3213
    """
3214
    # set optional parameters to none if they don't exist
3215
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3216
                 "iallocator"]:
3217
      if not hasattr(self.op, attr):
3218
        setattr(self.op, attr, None)
3219

    
3220
    if self.op.mode not in (constants.INSTANCE_CREATE,
3221
                            constants.INSTANCE_IMPORT):
3222
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3223
                                 self.op.mode)
3224

    
3225
    if (not self.cfg.GetVGName() and
3226
        self.op.disk_template not in constants.DTS_NOT_LVM):
3227
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3228
                                 " instances")
3229

    
3230
    if self.op.mode == constants.INSTANCE_IMPORT:
3231
      src_node = getattr(self.op, "src_node", None)
3232
      src_path = getattr(self.op, "src_path", None)
3233
      if src_node is None or src_path is None:
3234
        raise errors.OpPrereqError("Importing an instance requires source"
3235
                                   " node and path options")
3236
      src_node_full = self.cfg.ExpandNodeName(src_node)
3237
      if src_node_full is None:
3238
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3239
      self.op.src_node = src_node = src_node_full
3240

    
3241
      if not os.path.isabs(src_path):
3242
        raise errors.OpPrereqError("The source path must be absolute")
3243

    
3244
      export_info = rpc.call_export_info(src_node, src_path)
3245

    
3246
      if not export_info:
3247
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3248

    
3249
      if not export_info.has_section(constants.INISECT_EXP):
3250
        raise errors.ProgrammerError("Corrupted export config")
3251

    
3252
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3253
      if (int(ei_version) != constants.EXPORT_VERSION):
3254
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3255
                                   (ei_version, constants.EXPORT_VERSION))
3256

    
3257
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3258
        raise errors.OpPrereqError("Can't import instance with more than"
3259
                                   " one data disk")
3260

    
3261
      # FIXME: are the old os-es, disk sizes, etc. useful?
3262
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3263
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3264
                                                         'disk0_dump'))
3265
      self.src_image = diskimage
3266
    else: # INSTANCE_CREATE
3267
      if getattr(self.op, "os_type", None) is None:
3268
        raise errors.OpPrereqError("No guest OS specified")
3269

    
3270
    #### instance parameters check
3271

    
3272
    # disk template and mirror node verification
3273
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3274
      raise errors.OpPrereqError("Invalid disk template name")
3275

    
3276
    # instance name verification
3277
    hostname1 = utils.HostInfo(self.op.instance_name)
3278

    
3279
    self.op.instance_name = instance_name = hostname1.name
3280
    instance_list = self.cfg.GetInstanceList()
3281
    if instance_name in instance_list:
3282
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3283
                                 instance_name)
3284

    
3285
    # ip validity checks
3286
    ip = getattr(self.op, "ip", None)
3287
    if ip is None or ip.lower() == "none":
3288
      inst_ip = None
3289
    elif ip.lower() == "auto":
3290
      inst_ip = hostname1.ip
3291
    else:
3292
      if not utils.IsValidIP(ip):
3293
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3294
                                   " like a valid IP" % ip)
3295
      inst_ip = ip
3296
    self.inst_ip = self.op.ip = inst_ip
3297

    
3298
    if self.op.start and not self.op.ip_check:
3299
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3300
                                 " adding an instance in start mode")
3301

    
3302
    if self.op.ip_check:
3303
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3304
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3305
                                   (hostname1.ip, instance_name))
3306

    
3307
    # MAC address verification
3308
    if self.op.mac != "auto":
3309
      if not utils.IsValidMac(self.op.mac.lower()):
3310
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3311
                                   self.op.mac)
3312

    
3313
    # bridge verification
3314
    bridge = getattr(self.op, "bridge", None)
3315
    if bridge is None:
3316
      self.op.bridge = self.cfg.GetDefBridge()
3317
    else:
3318
      self.op.bridge = bridge
3319

    
3320
    # boot order verification
3321
    if self.op.hvm_boot_order is not None:
3322
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3323
        raise errors.OpPrereqError("invalid boot order specified,"
3324
                                   " must be one or more of [acdn]")
3325
    # file storage checks
3326
    if (self.op.file_driver and
3327
        not self.op.file_driver in constants.FILE_DRIVER):
3328
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3329
                                 self.op.file_driver)
3330

    
3331
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3332
        raise errors.OpPrereqError("File storage directory not a relative"
3333
                                   " path")
3334
    #### allocator run
3335

    
3336
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3337
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3338
                                 " node must be given")
3339

    
3340
    if self.op.iallocator is not None:
3341
      self._RunAllocator()
3342

    
3343
    #### node related checks
3344

    
3345
    # check primary node
3346
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3347
    if pnode is None:
3348
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3349
                                 self.op.pnode)
3350
    self.op.pnode = pnode.name
3351
    self.pnode = pnode
3352
    self.secondaries = []
3353

    
3354
    # mirror node verification
3355
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3356
      if getattr(self.op, "snode", None) is None:
3357
        raise errors.OpPrereqError("The networked disk templates need"
3358
                                   " a mirror node")
3359

    
3360
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3361
      if snode_name is None:
3362
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3363
                                   self.op.snode)
3364
      elif snode_name == pnode.name:
3365
        raise errors.OpPrereqError("The secondary node cannot be"
3366
                                   " the primary node.")
3367
      self.secondaries.append(snode_name)
3368

    
3369
    req_size = _ComputeDiskSize(self.op.disk_template,
3370
                                self.op.disk_size, self.op.swap_size)
3371

    
3372
    # Check lv size requirements
3373
    if req_size is not None:
3374
      nodenames = [pnode.name] + self.secondaries
3375
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3376
      for node in nodenames:
3377
        info = nodeinfo.get(node, None)
3378
        if not info:
3379
          raise errors.OpPrereqError("Cannot get current information"
3380
                                     " from node '%s'" % nodeinfo)
3381
        vg_free = info.get('vg_free', None)
3382
        if not isinstance(vg_free, int):
3383
          raise errors.OpPrereqError("Can't compute free disk space on"
3384
                                     " node %s" % node)
3385
        if req_size > info['vg_free']:
3386
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3387
                                     " %d MB available, %d MB required" %
3388
                                     (node, info['vg_free'], req_size))
3389

    
3390
    # os verification
3391
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3392
    if not os_obj:
3393
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3394
                                 " primary node"  % self.op.os_type)
3395

    
3396
    if self.op.kernel_path == constants.VALUE_NONE:
3397
      raise errors.OpPrereqError("Can't set instance kernel to none")
3398

    
3399

    
3400
    # bridge check on primary node
3401
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3402
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3403
                                 " destination node '%s'" %
3404
                                 (self.op.bridge, pnode.name))
3405

    
3406
    if self.op.start:
3407
      self.instance_status = 'up'
3408
    else:
3409
      self.instance_status = 'down'
3410

    
3411
  def Exec(self, feedback_fn):
3412
    """Create and add the instance to the cluster.
3413

3414
    """
3415
    instance = self.op.instance_name
3416
    pnode_name = self.pnode.name
3417

    
3418
    if self.op.mac == "auto":
3419
      mac_address = self.cfg.GenerateMAC()
3420
    else:
3421
      mac_address = self.op.mac
3422

    
3423
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3424
    if self.inst_ip is not None:
3425
      nic.ip = self.inst_ip
3426

    
3427
    ht_kind = self.sstore.GetHypervisorType()
3428
    if ht_kind in constants.HTS_REQ_PORT:
3429
      network_port = self.cfg.AllocatePort()
3430
    else:
3431
      network_port = None
3432

    
3433
    # this is needed because os.path.join does not accept None arguments
3434
    if self.op.file_storage_dir is None:
3435
      string_file_storage_dir = ""
3436
    else:
3437
      string_file_storage_dir = self.op.file_storage_dir
3438

    
3439
    # build the full file storage dir path
3440
    file_storage_dir = os.path.normpath(os.path.join(
3441
                                        self.sstore.GetFileStorageDir(),
3442
                                        string_file_storage_dir, instance))
3443

    
3444

    
3445
    disks = _GenerateDiskTemplate(self.cfg,
3446
                                  self.op.disk_template,
3447
                                  instance, pnode_name,
3448
                                  self.secondaries, self.op.disk_size,
3449
                                  self.op.swap_size,
3450
                                  file_storage_dir,
3451
                                  self.op.file_driver)
3452

    
3453
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3454
                            primary_node=pnode_name,
3455
                            memory=self.op.mem_size,
3456
                            vcpus=self.op.vcpus,
3457
                            nics=[nic], disks=disks,
3458
                            disk_template=self.op.disk_template,
3459
                            status=self.instance_status,
3460
                            network_port=network_port,
3461
                            kernel_path=self.op.kernel_path,
3462
                            initrd_path=self.op.initrd_path,
3463
                            hvm_boot_order=self.op.hvm_boot_order,
3464
                            )
3465

    
3466
    feedback_fn("* creating instance disks...")
3467
    if not _CreateDisks(self.cfg, iobj):
3468
      _RemoveDisks(iobj, self.cfg)
3469
      raise errors.OpExecError("Device creation failed, reverting...")
3470

    
3471
    feedback_fn("adding instance %s to cluster config" % instance)
3472

    
3473
    self.cfg.AddInstance(iobj)
3474

    
3475
    if self.op.wait_for_sync:
3476
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3477
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3478
      # make sure the disks are not degraded (still sync-ing is ok)
3479
      time.sleep(15)
3480
      feedback_fn("* checking mirrors status")
3481
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3482
    else:
3483
      disk_abort = False
3484

    
3485
    if disk_abort:
3486
      _RemoveDisks(iobj, self.cfg)
3487
      self.cfg.RemoveInstance(iobj.name)
3488
      raise errors.OpExecError("There are some degraded disks for"
3489
                               " this instance")
3490

    
3491
    feedback_fn("creating os for instance %s on node %s" %
3492
                (instance, pnode_name))
3493

    
3494
    if iobj.disk_template != constants.DT_DISKLESS:
3495
      if self.op.mode == constants.INSTANCE_CREATE:
3496
        feedback_fn("* running the instance OS create scripts...")
3497
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3498
          raise errors.OpExecError("could not add os for instance %s"
3499
                                   " on node %s" %
3500
                                   (instance, pnode_name))
3501

    
3502
      elif self.op.mode == constants.INSTANCE_IMPORT:
3503
        feedback_fn("* running the instance OS import scripts...")
3504
        src_node = self.op.src_node
3505
        src_image = self.src_image
3506
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3507
                                                src_node, src_image):
3508
          raise errors.OpExecError("Could not import os for instance"
3509
                                   " %s on node %s" %
3510
                                   (instance, pnode_name))
3511
      else:
3512
        # also checked in the prereq part
3513
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3514
                                     % self.op.mode)
3515

    
3516
    if self.op.start:
3517
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3518
      feedback_fn("* starting instance...")
3519
      if not rpc.call_instance_start(pnode_name, iobj, None):
3520
        raise errors.OpExecError("Could not start instance")
3521

    
3522

    
3523
class LUConnectConsole(NoHooksLU):
3524
  """Connect to an instance's console.
3525

3526
  This is somewhat special in that it returns the command line that
3527
  you need to run on the master node in order to connect to the
3528
  console.
3529

3530
  """
3531
  _OP_REQP = ["instance_name"]
3532

    
3533
  def CheckPrereq(self):
3534
    """Check prerequisites.
3535

3536
    This checks that the instance is in the cluster.
3537

3538
    """
3539
    instance = self.cfg.GetInstanceInfo(
3540
      self.cfg.ExpandInstanceName(self.op.instance_name))
3541
    if instance is None:
3542
      raise errors.OpPrereqError("Instance '%s' not known" %
3543
                                 self.op.instance_name)
3544
    self.instance = instance
3545

    
3546
  def Exec(self, feedback_fn):
3547
    """Connect to the console of an instance
3548

3549
    """
3550
    instance = self.instance
3551
    node = instance.primary_node
3552

    
3553
    node_insts = rpc.call_instance_list([node])[node]
3554
    if node_insts is False:
3555
      raise errors.OpExecError("Can't connect to node %s." % node)
3556

    
3557
    if instance.name not in node_insts:
3558
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3559

    
3560
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3561

    
3562
    hyper = hypervisor.GetHypervisor()
3563
    console_cmd = hyper.GetShellCommandForConsole(instance)
3564

    
3565
    # build ssh cmdline
3566
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3567

    
3568

    
3569
class LUReplaceDisks(LogicalUnit):
3570
  """Replace the disks of an instance.
3571

3572
  """
3573
  HPATH = "mirrors-replace"
3574
  HTYPE = constants.HTYPE_INSTANCE
3575
  _OP_REQP = ["instance_name", "mode", "disks"]
3576

    
3577
  def BuildHooksEnv(self):
3578
    """Build hooks env.
3579

3580
    This runs on the master, the primary and all the secondaries.
3581

3582
    """
3583
    env = {
3584
      "MODE": self.op.mode,
3585
      "NEW_SECONDARY": self.op.remote_node,
3586
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3587
      }
3588
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3589
    nl = [
3590
      self.sstore.GetMasterNode(),
3591
      self.instance.primary_node,
3592
      ]
3593
    if self.op.remote_node is not None:
3594
      nl.append(self.op.remote_node)
3595
    return env, nl, nl
3596

    
3597
  def CheckPrereq(self):
3598
    """Check prerequisites.
3599

3600
    This checks that the instance is in the cluster.
3601

3602
    """
3603
    instance = self.cfg.GetInstanceInfo(
3604
      self.cfg.ExpandInstanceName(self.op.instance_name))
3605
    if instance is None:
3606
      raise errors.OpPrereqError("Instance '%s' not known" %
3607
                                 self.op.instance_name)
3608
    self.instance = instance
3609
    self.op.instance_name = instance.name
3610

    
3611
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3612
      raise errors.OpPrereqError("Instance's disk layout is not"
3613
                                 " network mirrored.")
3614

    
3615
    if len(instance.secondary_nodes) != 1:
3616
      raise errors.OpPrereqError("The instance has a strange layout,"
3617
                                 " expected one secondary but found %d" %
3618
                                 len(instance.secondary_nodes))
3619

    
3620
    self.sec_node = instance.secondary_nodes[0]
3621

    
3622
    remote_node = getattr(self.op, "remote_node", None)
3623
    if remote_node is not None:
3624
      remote_node = self.cfg.ExpandNodeName(remote_node)
3625
      if remote_node is None:
3626
        raise errors.OpPrereqError("Node '%s' not known" %
3627
                                   self.op.remote_node)
3628
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3629
    else:
3630
      self.remote_node_info = None
3631
    if remote_node == instance.primary_node:
3632
      raise errors.OpPrereqError("The specified node is the primary node of"
3633
                                 " the instance.")
3634
    elif remote_node == self.sec_node:
3635
      if self.op.mode == constants.REPLACE_DISK_SEC:
3636
        # this is for DRBD8, where we can't execute the same mode of
3637
        # replacement as for drbd7 (no different port allocated)
3638
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3639
                                   " replacement")
3640
      # the user gave the current secondary, switch to
3641
      # 'no-replace-secondary' mode for drbd7
3642
      remote_node = None
3643
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3644
        self.op.mode != constants.REPLACE_DISK_ALL):
3645
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3646
                                 " disks replacement, not individual ones")
3647
    if instance.disk_template == constants.DT_DRBD8:
3648
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3649
          remote_node is not None):
3650
        # switch to replace secondary mode
3651
        self.op.mode = constants.REPLACE_DISK_SEC
3652

    
3653
      if self.op.mode == constants.REPLACE_DISK_ALL:
3654
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3655
                                   " secondary disk replacement, not"
3656
                                   " both at once")
3657
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3658
        if remote_node is not None:
3659
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3660
                                     " the secondary while doing a primary"
3661
                                     " node disk replacement")
3662
        self.tgt_node = instance.primary_node
3663
        self.oth_node = instance.secondary_nodes[0]
3664
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3665
        self.new_node = remote_node # this can be None, in which case
3666
                                    # we don't change the secondary
3667
        self.tgt_node = instance.secondary_nodes[0]
3668
        self.oth_node = instance.primary_node
3669
      else:
3670
        raise errors.ProgrammerError("Unhandled disk replace mode")
3671

    
3672
    for name in self.op.disks:
3673
      if instance.FindDisk(name) is None:
3674
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3675
                                   (name, instance.name))
3676
    self.op.remote_node = remote_node
3677

    
3678
  def _ExecRR1(self, feedback_fn):
3679
    """Replace the disks of an instance.
3680

3681
    """
3682
    instance = self.instance
3683
    iv_names = {}
3684
    # start of work
3685
    if self.op.remote_node is None:
3686
      remote_node = self.sec_node
3687
    else:
3688
      remote_node = self.op.remote_node
3689
    cfg = self.cfg
3690
    for dev in instance.disks:
3691
      size = dev.size
3692
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3693
      names = _GenerateUniqueNames(cfg, lv_names)
3694
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3695
                                       remote_node, size, names)
3696
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3697
      logger.Info("adding new mirror component on secondary for %s" %
3698
                  dev.iv_name)
3699
      #HARDCODE
3700
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3701
                                        new_drbd, False,
3702
                                        _GetInstanceInfoText(instance)):
3703
        raise errors.OpExecError("Failed to create new component on secondary"
3704
                                 " node %s. Full abort, cleanup manually!" %
3705
                                 remote_node)
3706

    
3707
      logger.Info("adding new mirror component on primary")
3708
      #HARDCODE
3709
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3710
                                      instance, new_drbd,
3711
                                      _GetInstanceInfoText(instance)):
3712
        # remove secondary dev
3713
        cfg.SetDiskID(new_drbd, remote_node)
3714
        rpc.call_blockdev_remove(remote_node, new_drbd)
3715
        raise errors.OpExecError("Failed to create volume on primary!"
3716
                                 " Full abort, cleanup manually!!")
3717

    
3718
      # the device exists now
3719
      # call the primary node to add the mirror to md
3720
      logger.Info("adding new mirror component to md")
3721
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3722
                                           [new_drbd]):
3723
        logger.Error("Can't add mirror compoment to md!")
3724
        cfg.SetDiskID(new_drbd, remote_node)
3725
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3726
          logger.Error("Can't rollback on secondary")
3727
        cfg.SetDiskID(new_drbd, instance.primary_node)
3728
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3729
          logger.Error("Can't rollback on primary")
3730
        raise errors.OpExecError("Full abort, cleanup manually!!")
3731

    
3732
      dev.children.append(new_drbd)
3733
      cfg.AddInstance(instance)
3734

    
3735
    # this can fail as the old devices are degraded and _WaitForSync
3736
    # does a combined result over all disks, so we don't check its
3737
    # return value
3738
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3739

    
3740
    # so check manually all the devices
3741
    for name in iv_names:
3742
      dev, child, new_drbd = iv_names[name]
3743
      cfg.SetDiskID(dev, instance.primary_node)
3744
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3745
      if is_degr:
3746
        raise errors.OpExecError("MD device %s is degraded!" % name)
3747
      cfg.SetDiskID(new_drbd, instance.primary_node)
3748
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3749
      if is_degr:
3750
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3751

    
3752
    for name in iv_names:
3753
      dev, child, new_drbd = iv_names[name]
3754
      logger.Info("remove mirror %s component" % name)
3755
      cfg.SetDiskID(dev, instance.primary_node)
3756
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3757
                                              dev, [child]):
3758
        logger.Error("Can't remove child from mirror, aborting"
3759
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3760
        continue
3761

    
3762
      for node in child.logical_id[:2]:
3763
        logger.Info("remove child device on %s" % node)
3764
        cfg.SetDiskID(child, node)
3765
        if not rpc.call_blockdev_remove(node, child):
3766
          logger.Error("Warning: failed to remove device from node %s,"
3767
                       " continuing operation." % node)
3768

    
3769
      dev.children.remove(child)
3770

    
3771
      cfg.AddInstance(instance)
3772

    
3773
  def _ExecD8DiskOnly(self, feedback_fn):
3774
    """Replace a disk on the primary or secondary for dbrd8.
3775

3776
    The algorithm for replace is quite complicated:
3777
      - for each disk to be replaced:
3778
        - create new LVs on the target node with unique names
3779
        - detach old LVs from the drbd device
3780
        - rename old LVs to name_replaced.<time_t>
3781
        - rename new LVs to old LVs
3782
        - attach the new LVs (with the old names now) to the drbd device
3783
      - wait for sync across all devices
3784
      - for each modified disk:
3785
        - remove old LVs (which have the name name_replaces.<time_t>)
3786

3787
    Failures are not very well handled.
3788

3789
    """
3790
    steps_total = 6
3791
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3792
    instance = self.instance
3793
    iv_names = {}
3794
    vgname = self.cfg.GetVGName()
3795
    # start of work
3796
    cfg = self.cfg
3797
    tgt_node = self.tgt_node
3798
    oth_node = self.oth_node
3799

    
3800
    # Step: check device activation
3801
    self.proc.LogStep(1, steps_total, "check device existence")
3802
    info("checking volume groups")
3803
    my_vg = cfg.GetVGName()
3804
    results = rpc.call_vg_list([oth_node, tgt_node])
3805
    if not results:
3806
      raise errors.OpExecError("Can't list volume groups on the nodes")
3807
    for node in oth_node, tgt_node:
3808
      res = results.get(node, False)
3809
      if not res or my_vg not in res:
3810
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3811
                                 (my_vg, node))
3812
    for dev in instance.disks:
3813
      if not dev.iv_name in self.op.disks:
3814
        continue
3815
      for node in tgt_node, oth_node:
3816
        info("checking %s on %s" % (dev.iv_name, node))
3817
        cfg.SetDiskID(dev, node)
3818
        if not rpc.call_blockdev_find(node, dev):
3819
          raise errors.OpExecError("Can't find device %s on node %s" %
3820
                                   (dev.iv_name, node))
3821

    
3822
    # Step: check other node consistency
3823
    self.proc.LogStep(2, steps_total, "check peer consistency")
3824
    for dev in instance.disks:
3825
      if not dev.iv_name in self.op.disks:
3826
        continue
3827
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3828
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3829
                                   oth_node==instance.primary_node):
3830
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3831
                                 " to replace disks on this node (%s)" %
3832
                                 (oth_node, tgt_node))
3833

    
3834
    # Step: create new storage
3835
    self.proc.LogStep(3, steps_total, "allocate new storage")
3836
    for dev in instance.disks:
3837
      if not dev.iv_name in self.op.disks:
3838
        continue
3839
      size = dev.size
3840
      cfg.SetDiskID(dev, tgt_node)
3841
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3842
      names = _GenerateUniqueNames(cfg, lv_names)
3843
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3844
                             logical_id=(vgname, names[0]))
3845
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3846
                             logical_id=(vgname, names[1]))
3847
      new_lvs = [lv_data, lv_meta]
3848
      old_lvs = dev.children
3849
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3850
      info("creating new local storage on %s for %s" %
3851
           (tgt_node, dev.iv_name))
3852
      # since we *always* want to create this LV, we use the
3853
      # _Create...OnPrimary (which forces the creation), even if we
3854
      # are talking about the secondary node
3855
      for new_lv in new_lvs:
3856
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3857
                                        _GetInstanceInfoText(instance)):
3858
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3859
                                   " node '%s'" %
3860
                                   (new_lv.logical_id[1], tgt_node))
3861

    
3862
    # Step: for each lv, detach+rename*2+attach
3863
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3864
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3865
      info("detaching %s drbd from local storage" % dev.iv_name)
3866
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3867
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3868
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3869
      #dev.children = []
3870
      #cfg.Update(instance)
3871

    
3872
      # ok, we created the new LVs, so now we know we have the needed
3873
      # storage; as such, we proceed on the target node to rename
3874
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3875
      # using the assumption that logical_id == physical_id (which in
3876
      # turn is the unique_id on that node)
3877

    
3878
      # FIXME(iustin): use a better name for the replaced LVs
3879
      temp_suffix = int(time.time())
3880
      ren_fn = lambda d, suff: (d.physical_id[0],
3881
                                d.physical_id[1] + "_replaced-%s" % suff)
3882
      # build the rename list based on what LVs exist on the node
3883
      rlist = []
3884
      for to_ren in old_lvs:
3885
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3886
        if find_res is not None: # device exists
3887
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3888

    
3889
      info("renaming the old LVs on the target node")
3890
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3891
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3892
      # now we rename the new LVs to the old LVs
3893
      info("renaming the new LVs on the target node")
3894
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3895
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3896
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3897

    
3898
      for old, new in zip(old_lvs, new_lvs):
3899
        new.logical_id = old.logical_id
3900
        cfg.SetDiskID(new, tgt_node)
3901

    
3902
      for disk in old_lvs:
3903
        disk.logical_id = ren_fn(disk, temp_suffix)
3904
        cfg.SetDiskID(disk, tgt_node)
3905

    
3906
      # now that the new lvs have the old name, we can add them to the device
3907
      info("adding new mirror component on %s" % tgt_node)
3908
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3909
        for new_lv in new_lvs:
3910
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3911
            warning("Can't rollback device %s", hint="manually cleanup unused"
3912
                    " logical volumes")
3913
        raise errors.OpExecError("Can't add local storage to drbd")
3914

    
3915
      dev.children = new_lvs
3916
      cfg.Update(instance)
3917

    
3918
    # Step: wait for sync
3919

    
3920
    # this can fail as the old devices are degraded and _WaitForSync
3921
    # does a combined result over all disks, so we don't check its
3922
    # return value
3923
    self.proc.LogStep(5, steps_total, "sync devices")
3924
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3925

    
3926
    # so check manually all the devices
3927
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3928
      cfg.SetDiskID(dev, instance.primary_node)
3929
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3930
      if is_degr:
3931
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3932

    
3933
    # Step: remove old storage
3934
    self.proc.LogStep(6, steps_total, "removing old storage")
3935
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3936
      info("remove logical volumes for %s" % name)
3937
      for lv in old_lvs:
3938
        cfg.SetDiskID(lv, tgt_node)
3939
        if not rpc.call_blockdev_remove(tgt_node, lv):
3940
          warning("Can't remove old LV", hint="manually remove unused LVs")
3941
          continue
3942

    
3943
  def _ExecD8Secondary(self, feedback_fn):
3944
    """Replace the secondary node for drbd8.
3945

3946
    The algorithm for replace is quite complicated:
3947
      - for all disks of the instance:
3948
        - create new LVs on the new node with same names
3949
        - shutdown the drbd device on the old secondary
3950
        - disconnect the drbd network on the primary
3951
        - create the drbd device on the new secondary
3952
        - network attach the drbd on the primary, using an artifice:
3953
          the drbd code for Attach() will connect to the network if it
3954
          finds a device which is connected to the good local disks but
3955
          not network enabled
3956
      - wait for sync across all devices
3957
      - remove all disks from the old secondary
3958

3959
    Failures are not very well handled.
3960

3961
    """
3962
    steps_total = 6
3963
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3964
    instance = self.instance
3965
    iv_names = {}
3966
    vgname = self.cfg.GetVGName()
3967
    # start of work
3968
    cfg = self.cfg
3969
    old_node = self.tgt_node
3970
    new_node = self.new_node
3971
    pri_node = instance.primary_node
3972

    
3973
    # Step: check device activation
3974
    self.proc.LogStep(1, steps_total, "check device existence")
3975
    info("checking volume groups")
3976
    my_vg = cfg.GetVGName()
3977
    results = rpc.call_vg_list([pri_node, new_node])
3978
    if not results:
3979
      raise errors.OpExecError("Can't list volume groups on the nodes")
3980
    for node in pri_node, new_node:
3981
      res = results.get(node, False)
3982
      if not res or my_vg not in res:
3983
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3984
                                 (my_vg, node))
3985
    for dev in instance.disks:
3986
      if not dev.iv_name in self.op.disks:
3987
        continue
3988
      info("checking %s on %s" % (dev.iv_name, pri_node))
3989
      cfg.SetDiskID(dev, pri_node)
3990
      if not rpc.call_blockdev_find(pri_node, dev):
3991
        raise errors.OpExecError("Can't find device %s on node %s" %
3992
                                 (dev.iv_name, pri_node))
3993

    
3994
    # Step: check other node consistency
3995
    self.proc.LogStep(2, steps_total, "check peer consistency")
3996
    for dev in instance.disks:
3997
      if not dev.iv_name in self.op.disks:
3998
        continue
3999
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4000
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4001
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4002
                                 " unsafe to replace the secondary" %
4003
                                 pri_node)
4004

    
4005
    # Step: create new storage
4006
    self.proc.LogStep(3, steps_total, "allocate new storage")
4007
    for dev in instance.disks:
4008
      size = dev.size
4009
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4010
      # since we *always* want to create this LV, we use the
4011
      # _Create...OnPrimary (which forces the creation), even if we
4012
      # are talking about the secondary node
4013
      for new_lv in dev.children:
4014
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4015
                                        _GetInstanceInfoText(instance)):
4016
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4017
                                   " node '%s'" %
4018
                                   (new_lv.logical_id[1], new_node))
4019

    
4020
      iv_names[dev.iv_name] = (dev, dev.children)
4021

    
4022
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4023
    for dev in instance.disks:
4024
      size = dev.size
4025
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4026
      # create new devices on new_node
4027
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4028
                              logical_id=(pri_node, new_node,
4029
                                          dev.logical_id[2]),
4030
                              children=dev.children)
4031
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4032
                                        new_drbd, False,
4033
                                      _GetInstanceInfoText(instance)):
4034
        raise errors.OpExecError("Failed to create new DRBD on"
4035
                                 " node '%s'" % new_node)
4036

    
4037
    for dev in instance.disks:
4038
      # we have new devices, shutdown the drbd on the old secondary
4039
      info("shutting down drbd for %s on old node" % dev.iv_name)
4040
      cfg.SetDiskID(dev, old_node)
4041
      if not rpc.call_blockdev_shutdown(old_node, dev):
4042
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4043
                hint="Please cleanup this device manually as soon as possible")
4044

    
4045
    info("detaching primary drbds from the network (=> standalone)")
4046
    done = 0
4047
    for dev in instance.disks:
4048
      cfg.SetDiskID(dev, pri_node)
4049
      # set the physical (unique in bdev terms) id to None, meaning
4050
      # detach from network
4051
      dev.physical_id = (None,) * len(dev.physical_id)
4052
      # and 'find' the device, which will 'fix' it to match the
4053
      # standalone state
4054
      if rpc.call_blockdev_find(pri_node, dev):
4055
        done += 1
4056
      else:
4057
        warning("Failed to detach drbd %s from network, unusual case" %
4058
                dev.iv_name)
4059

    
4060
    if not done:
4061
      # no detaches succeeded (very unlikely)
4062
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4063

    
4064
    # if we managed to detach at least one, we update all the disks of
4065
    # the instance to point to the new secondary
4066
    info("updating instance configuration")
4067
    for dev in instance.disks:
4068
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4069
      cfg.SetDiskID(dev, pri_node)
4070
    cfg.Update(instance)
4071

    
4072
    # and now perform the drbd attach
4073
    info("attaching primary drbds to new secondary (standalone => connected)")
4074
    failures = []
4075
    for dev in instance.disks:
4076
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4077
      # since the attach is smart, it's enough to 'find' the device,
4078
      # it will automatically activate the network, if the physical_id
4079
      # is correct
4080
      cfg.SetDiskID(dev, pri_node)
4081
      if not rpc.call_blockdev_find(pri_node, dev):
4082
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4083
                "please do a gnt-instance info to see the status of disks")
4084

    
4085
    # this can fail as the old devices are degraded and _WaitForSync
4086
    # does a combined result over all disks, so we don't check its
4087
    # return value
4088
    self.proc.LogStep(5, steps_total, "sync devices")
4089
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4090

    
4091
    # so check manually all the devices
4092
    for name, (dev, old_lvs) in iv_names.iteritems():
4093
      cfg.SetDiskID(dev, pri_node)
4094
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4095
      if is_degr:
4096
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4097

    
4098
    self.proc.LogStep(6, steps_total, "removing old storage")
4099
    for name, (dev, old_lvs) in iv_names.iteritems():
4100
      info("remove logical volumes for %s" % name)
4101
      for lv in old_lvs:
4102
        cfg.SetDiskID(lv, old_node)
4103
        if not rpc.call_blockdev_remove(old_node, lv):
4104
          warning("Can't remove LV on old secondary",
4105
                  hint="Cleanup stale volumes by hand")
4106

    
4107
  def Exec(self, feedback_fn):
4108
    """Execute disk replacement.
4109

4110
    This dispatches the disk replacement to the appropriate handler.
4111

4112
    """
4113
    instance = self.instance
4114
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4115
      fn = self._ExecRR1
4116
    elif instance.disk_template == constants.DT_DRBD8:
4117
      if self.op.remote_node is None:
4118
        fn = self._ExecD8DiskOnly
4119
      else:
4120
        fn = self._ExecD8Secondary
4121
    else:
4122
      raise errors.ProgrammerError("Unhandled disk replacement case")
4123
    return fn(feedback_fn)
4124

    
4125

    
4126
class LUQueryInstanceData(NoHooksLU):
4127
  """Query runtime instance data.
4128

4129
  """
4130
  _OP_REQP = ["instances"]
4131

    
4132
  def CheckPrereq(self):
4133
    """Check prerequisites.
4134

4135
    This only checks the optional instance list against the existing names.
4136

4137
    """
4138
    if not isinstance(self.op.instances, list):
4139
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4140
    if self.op.instances:
4141
      self.wanted_instances = []
4142
      names = self.op.instances
4143
      for name in names:
4144
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4145
        if instance is None:
4146
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4147
        self.wanted_instances.append(instance)
4148
    else:
4149
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4150
                               in self.cfg.GetInstanceList()]
4151
    return
4152

    
4153

    
4154
  def _ComputeDiskStatus(self, instance, snode, dev):
4155
    """Compute block device status.
4156

4157
    """
4158
    self.cfg.SetDiskID(dev, instance.primary_node)
4159
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4160
    if dev.dev_type in constants.LDS_DRBD:
4161
      # we change the snode then (otherwise we use the one passed in)
4162
      if dev.logical_id[0] == instance.primary_node:
4163
        snode = dev.logical_id[1]
4164
      else:
4165
        snode = dev.logical_id[0]
4166

    
4167
    if snode:
4168
      self.cfg.SetDiskID(dev, snode)
4169
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4170
    else:
4171
      dev_sstatus = None
4172

    
4173
    if dev.children:
4174
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4175
                      for child in dev.children]
4176
    else:
4177
      dev_children = []
4178

    
4179
    data = {
4180
      "iv_name": dev.iv_name,
4181
      "dev_type": dev.dev_type,
4182
      "logical_id": dev.logical_id,
4183
      "physical_id": dev.physical_id,
4184
      "pstatus": dev_pstatus,
4185
      "sstatus": dev_sstatus,
4186
      "children": dev_children,
4187
      }
4188

    
4189
    return data
4190

    
4191
  def Exec(self, feedback_fn):
4192
    """Gather and return data"""
4193
    result = {}
4194
    for instance in self.wanted_instances:
4195
      remote_info = rpc.call_instance_info(instance.primary_node,
4196
                                                instance.name)
4197
      if remote_info and "state" in remote_info:
4198
        remote_state = "up"
4199
      else:
4200
        remote_state = "down"
4201
      if instance.status == "down":
4202
        config_state = "down"
4203
      else:
4204
        config_state = "up"
4205

    
4206
      disks = [self._ComputeDiskStatus(instance, None, device)
4207
               for device in instance.disks]
4208

    
4209
      idict = {
4210
        "name": instance.name,
4211
        "config_state": config_state,
4212
        "run_state": remote_state,
4213
        "pnode": instance.primary_node,
4214
        "snodes": instance.secondary_nodes,
4215
        "os": instance.os,
4216
        "memory": instance.memory,
4217
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4218
        "disks": disks,
4219
        "network_port": instance.network_port,
4220
        "vcpus": instance.vcpus,
4221
        "kernel_path": instance.kernel_path,
4222
        "initrd_path": instance.initrd_path,
4223
        "hvm_boot_order": instance.hvm_boot_order,
4224
        }
4225

    
4226
      result[instance.name] = idict
4227

    
4228
    return result
4229

    
4230

    
4231
class LUSetInstanceParams(LogicalUnit):
4232
  """Modifies an instances's parameters.
4233

4234
  """
4235
  HPATH = "instance-modify"
4236
  HTYPE = constants.HTYPE_INSTANCE
4237
  _OP_REQP = ["instance_name"]
4238

    
4239
  def BuildHooksEnv(self):
4240
    """Build hooks env.
4241

4242
    This runs on the master, primary and secondaries.
4243

4244
    """
4245
    args = dict()
4246
    if self.mem:
4247
      args['memory'] = self.mem
4248
    if self.vcpus:
4249
      args['vcpus'] = self.vcpus
4250
    if self.do_ip or self.do_bridge or self.mac:
4251
      if self.do_ip:
4252
        ip = self.ip
4253
      else:
4254
        ip = self.instance.nics[0].ip
4255
      if self.bridge:
4256
        bridge = self.bridge
4257
      else:
4258
        bridge = self.instance.nics[0].bridge
4259
      if self.mac:
4260
        mac = self.mac
4261
      else:
4262
        mac = self.instance.nics[0].mac
4263
      args['nics'] = [(ip, bridge, mac)]
4264
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4265
    nl = [self.sstore.GetMasterNode(),
4266
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4267
    return env, nl, nl
4268

    
4269
  def CheckPrereq(self):
4270
    """Check prerequisites.
4271

4272
    This only checks the instance list against the existing names.
4273

4274
    """
4275
    self.mem = getattr(self.op, "mem", None)
4276
    self.vcpus = getattr(self.op, "vcpus", None)
4277
    self.ip = getattr(self.op, "ip", None)
4278
    self.mac = getattr(self.op, "mac", None)
4279
    self.bridge = getattr(self.op, "bridge", None)
4280
    self.kernel_path = getattr(self.op, "kernel_path", None)
4281
    self.initrd_path = getattr(self.op, "initrd_path", None)
4282
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4283
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4284
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4285
    if all_params.count(None) == len(all_params):
4286
      raise errors.OpPrereqError("No changes submitted")
4287
    if self.mem is not None:
4288
      try:
4289
        self.mem = int(self.mem)
4290
      except ValueError, err:
4291
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4292
    if self.vcpus is not None:
4293
      try:
4294
        self.vcpus = int(self.vcpus)
4295
      except ValueError, err:
4296
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4297
    if self.ip is not None:
4298
      self.do_ip = True
4299
      if self.ip.lower() == "none":
4300
        self.ip = None
4301
      else:
4302
        if not utils.IsValidIP(self.ip):
4303
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4304
    else:
4305
      self.do_ip = False
4306
    self.do_bridge = (self.bridge is not None)
4307
    if self.mac is not None:
4308
      if self.cfg.IsMacInUse(self.mac):
4309
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4310
                                   self.mac)
4311
      if not utils.IsValidMac(self.mac):
4312
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4313

    
4314
    if self.kernel_path is not None:
4315
      self.do_kernel_path = True
4316
      if self.kernel_path == constants.VALUE_NONE:
4317
        raise errors.OpPrereqError("Can't set instance to no kernel")
4318

    
4319
      if self.kernel_path != constants.VALUE_DEFAULT:
4320
        if not os.path.isabs(self.kernel_path):
4321
          raise errors.OpPrereqError("The kernel path must be an absolute"
4322
                                    " filename")
4323
    else:
4324
      self.do_kernel_path = False
4325

    
4326
    if self.initrd_path is not None:
4327
      self.do_initrd_path = True
4328
      if self.initrd_path not in (constants.VALUE_NONE,
4329
                                  constants.VALUE_DEFAULT):
4330
        if not os.path.isabs(self.initrd_path):
4331
          raise errors.OpPrereqError("The initrd path must be an absolute"
4332
                                    " filename")
4333
    else:
4334
      self.do_initrd_path = False
4335

    
4336
    # boot order verification
4337
    if self.hvm_boot_order is not None:
4338
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4339
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4340
          raise errors.OpPrereqError("invalid boot order specified,"
4341
                                     " must be one or more of [acdn]"
4342
                                     " or 'default'")
4343

    
4344
    instance = self.cfg.GetInstanceInfo(
4345
      self.cfg.ExpandInstanceName(self.op.instance_name))
4346
    if instance is None:
4347
      raise errors.OpPrereqError("No such instance name '%s'" %
4348
                                 self.op.instance_name)
4349
    self.op.instance_name = instance.name
4350
    self.instance = instance
4351
    return
4352

    
4353
  def Exec(self, feedback_fn):
4354
    """Modifies an instance.
4355

4356
    All parameters take effect only at the next restart of the instance.
4357
    """
4358
    result = []
4359
    instance = self.instance
4360
    if self.mem:
4361
      instance.memory = self.mem
4362
      result.append(("mem", self.mem))
4363
    if self.vcpus:
4364
      instance.vcpus = self.vcpus
4365
      result.append(("vcpus",  self.vcpus))
4366
    if self.do_ip:
4367
      instance.nics[0].ip = self.ip
4368
      result.append(("ip", self.ip))
4369
    if self.bridge:
4370
      instance.nics[0].bridge = self.bridge
4371
      result.append(("bridge", self.bridge))
4372
    if self.mac:
4373
      instance.nics[0].mac = self.mac
4374
      result.append(("mac", self.mac))
4375
    if self.do_kernel_path:
4376
      instance.kernel_path = self.kernel_path
4377
      result.append(("kernel_path", self.kernel_path))
4378
    if self.do_initrd_path:
4379
      instance.initrd_path = self.initrd_path
4380
      result.append(("initrd_path", self.initrd_path))
4381
    if self.hvm_boot_order:
4382
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4383
        instance.hvm_boot_order = None
4384
      else:
4385
        instance.hvm_boot_order = self.hvm_boot_order
4386
      result.append(("hvm_boot_order", self.hvm_boot_order))
4387

    
4388
    self.cfg.AddInstance(instance)
4389

    
4390
    return result
4391

    
4392

    
4393
class LUQueryExports(NoHooksLU):
4394
  """Query the exports list
4395

4396
  """
4397
  _OP_REQP = []
4398

    
4399
  def CheckPrereq(self):
4400
    """Check that the nodelist contains only existing nodes.
4401

4402
    """
4403
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4404

    
4405
  def Exec(self, feedback_fn):
4406
    """Compute the list of all the exported system images.
4407

4408
    Returns:
4409
      a dictionary with the structure node->(export-list)
4410
      where export-list is a list of the instances exported on
4411
      that node.
4412

4413
    """
4414
    return rpc.call_export_list(self.nodes)
4415

    
4416

    
4417
class LUExportInstance(LogicalUnit):
4418
  """Export an instance to an image in the cluster.
4419

4420
  """
4421
  HPATH = "instance-export"
4422
  HTYPE = constants.HTYPE_INSTANCE
4423
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4424

    
4425
  def BuildHooksEnv(self):
4426
    """Build hooks env.
4427

4428
    This will run on the master, primary node and target node.
4429

4430
    """
4431
    env = {
4432
      "EXPORT_NODE": self.op.target_node,
4433
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4434
      }
4435
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4436
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4437
          self.op.target_node]
4438
    return env, nl, nl
4439

    
4440
  def CheckPrereq(self):
4441
    """Check prerequisites.
4442

4443
    This checks that the instance name is a valid one.
4444

4445
    """
4446
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4447
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4448
    if self.instance is None:
4449
      raise errors.OpPrereqError("Instance '%s' not found" %
4450
                                 self.op.instance_name)
4451

    
4452
    # node verification
4453
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4454
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4455

    
4456
    if self.dst_node is None:
4457
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4458
                                 self.op.target_node)
4459
    self.op.target_node = self.dst_node.name
4460

    
4461
  def Exec(self, feedback_fn):
4462
    """Export an instance to an image in the cluster.
4463

4464
    """
4465
    instance = self.instance
4466
    dst_node = self.dst_node
4467
    src_node = instance.primary_node
4468
    if self.op.shutdown:
4469
      # shutdown the instance, but not the disks
4470
      if not rpc.call_instance_shutdown(src_node, instance):
4471
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4472
                                 (instance.name, src_node))
4473

    
4474
    vgname = self.cfg.GetVGName()
4475

    
4476
    snap_disks = []
4477

    
4478
    try:
4479
      for disk in instance.disks:
4480
        if disk.iv_name == "sda":
4481
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4482
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4483

    
4484
          if not new_dev_name:
4485
            logger.Error("could not snapshot block device %s on node %s" %
4486
                         (disk.logical_id[1], src_node))
4487
          else:
4488
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4489
                                      logical_id=(vgname, new_dev_name),
4490
                                      physical_id=(vgname, new_dev_name),
4491
                                      iv_name=disk.iv_name)
4492
            snap_disks.append(new_dev)
4493

    
4494
    finally:
4495
      if self.op.shutdown and instance.status == "up":
4496
        if not rpc.call_instance_start(src_node, instance, None):
4497
          _ShutdownInstanceDisks(instance, self.cfg)
4498
          raise errors.OpExecError("Could not start instance")
4499

    
4500
    # TODO: check for size
4501

    
4502
    for dev in snap_disks:
4503
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4504
        logger.Error("could not export block device %s from node %s to node %s"
4505
                     % (dev.logical_id[1], src_node, dst_node.name))
4506
      if not rpc.call_blockdev_remove(src_node, dev):
4507
        logger.Error("could not remove snapshot block device %s from node %s" %
4508
                     (dev.logical_id[1], src_node))
4509

    
4510
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4511
      logger.Error("could not finalize export for instance %s on node %s" %
4512
                   (instance.name, dst_node.name))
4513

    
4514
    nodelist = self.cfg.GetNodeList()
4515
    nodelist.remove(dst_node.name)
4516

    
4517
    # on one-node clusters nodelist will be empty after the removal
4518
    # if we proceed the backup would be removed because OpQueryExports
4519
    # substitutes an empty list with the full cluster node list.
4520
    if nodelist:
4521
      op = opcodes.OpQueryExports(nodes=nodelist)
4522
      exportlist = self.proc.ChainOpCode(op)
4523
      for node in exportlist:
4524
        if instance.name in exportlist[node]:
4525
          if not rpc.call_export_remove(node, instance.name):
4526
            logger.Error("could not remove older export for instance %s"
4527
                         " on node %s" % (instance.name, node))
4528

    
4529

    
4530
class TagsLU(NoHooksLU):
4531
  """Generic tags LU.
4532

4533
  This is an abstract class which is the parent of all the other tags LUs.
4534

4535
  """
4536
  def CheckPrereq(self):
4537
    """Check prerequisites.
4538

4539
    """
4540
    if self.op.kind == constants.TAG_CLUSTER:
4541
      self.target = self.cfg.GetClusterInfo()
4542
    elif self.op.kind == constants.TAG_NODE:
4543
      name = self.cfg.ExpandNodeName(self.op.name)
4544
      if name is None:
4545
        raise errors.OpPrereqError("Invalid node name (%s)" %
4546
                                   (self.op.name,))
4547
      self.op.name = name
4548
      self.target = self.cfg.GetNodeInfo(name)
4549
    elif self.op.kind == constants.TAG_INSTANCE:
4550
      name = self.cfg.ExpandInstanceName(self.op.name)
4551
      if name is None:
4552
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4553
                                   (self.op.name,))
4554
      self.op.name = name
4555
      self.target = self.cfg.GetInstanceInfo(name)
4556
    else:
4557
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4558
                                 str(self.op.kind))
4559

    
4560

    
4561
class LUGetTags(TagsLU):
4562
  """Returns the tags of a given object.
4563

4564
  """
4565
  _OP_REQP = ["kind", "name"]
4566

    
4567
  def Exec(self, feedback_fn):
4568
    """Returns the tag list.
4569

4570
    """
4571
    return self.target.GetTags()
4572

    
4573

    
4574
class LUSearchTags(NoHooksLU):
4575
  """Searches the tags for a given pattern.
4576

4577
  """
4578
  _OP_REQP = ["pattern"]
4579

    
4580
  def CheckPrereq(self):
4581
    """Check prerequisites.
4582

4583
    This checks the pattern passed for validity by compiling it.
4584

4585
    """
4586
    try:
4587
      self.re = re.compile(self.op.pattern)
4588
    except re.error, err:
4589
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4590
                                 (self.op.pattern, err))
4591

    
4592
  def Exec(self, feedback_fn):
4593
    """Returns the tag list.
4594

4595
    """
4596
    cfg = self.cfg
4597
    tgts = [("/cluster", cfg.GetClusterInfo())]
4598
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4599
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4600
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4601
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4602
    results = []
4603
    for path, target in tgts:
4604
      for tag in target.GetTags():
4605
        if self.re.search(tag):
4606
          results.append((path, tag))
4607
    return results
4608

    
4609

    
4610
class LUAddTags(TagsLU):
4611
  """Sets a tag on a given object.
4612

4613
  """
4614
  _OP_REQP = ["kind", "name", "tags"]
4615

    
4616
  def CheckPrereq(self):
4617
    """Check prerequisites.
4618

4619
    This checks the type and length of the tag name and value.
4620

4621
    """
4622
    TagsLU.CheckPrereq(self)
4623
    for tag in self.op.tags:
4624
      objects.TaggableObject.ValidateTag(tag)
4625

    
4626
  def Exec(self, feedback_fn):
4627
    """Sets the tag.
4628

4629
    """
4630
    try:
4631
      for tag in self.op.tags:
4632
        self.target.AddTag(tag)
4633
    except errors.TagError, err:
4634
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4635
    try:
4636
      self.cfg.Update(self.target)
4637
    except errors.ConfigurationError:
4638
      raise errors.OpRetryError("There has been a modification to the"
4639
                                " config file and the operation has been"
4640
                                " aborted. Please retry.")
4641

    
4642

    
4643
class LUDelTags(TagsLU):
4644
  """Delete a list of tags from a given object.
4645

4646
  """
4647
  _OP_REQP = ["kind", "name", "tags"]
4648

    
4649
  def CheckPrereq(self):
4650
    """Check prerequisites.
4651

4652
    This checks that we have the given tag.
4653

4654
    """
4655
    TagsLU.CheckPrereq(self)
4656
    for tag in self.op.tags:
4657
      objects.TaggableObject.ValidateTag(tag)
4658
    del_tags = frozenset(self.op.tags)
4659
    cur_tags = self.target.GetTags()
4660
    if not del_tags <= cur_tags:
4661
      diff_tags = del_tags - cur_tags
4662
      diff_names = ["'%s'" % tag for tag in diff_tags]
4663
      diff_names.sort()
4664
      raise errors.OpPrereqError("Tag(s) %s not found" %
4665
                                 (",".join(diff_names)))
4666

    
4667
  def Exec(self, feedback_fn):
4668
    """Remove the tag from the object.
4669

4670
    """
4671
    for tag in self.op.tags:
4672
      self.target.RemoveTag(tag)
4673
    try:
4674
      self.cfg.Update(self.target)
4675
    except errors.ConfigurationError:
4676
      raise errors.OpRetryError("There has been a modification to the"
4677
                                " config file and the operation has been"
4678
                                " aborted. Please retry.")
4679

    
4680
class LUTestDelay(NoHooksLU):
4681
  """Sleep for a specified amount of time.
4682

4683
  This LU sleeps on the master and/or nodes for a specified amoutn of
4684
  time.
4685

4686
  """
4687
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4688

    
4689
  def CheckPrereq(self):
4690
    """Check prerequisites.
4691

4692
    This checks that we have a good list of nodes and/or the duration
4693
    is valid.
4694

4695
    """
4696

    
4697
    if self.op.on_nodes:
4698
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4699

    
4700
  def Exec(self, feedback_fn):
4701
    """Do the actual sleep.
4702

4703
    """
4704
    if self.op.on_master:
4705
      if not utils.TestDelay(self.op.duration):
4706
        raise errors.OpExecError("Error during master delay test")
4707
    if self.op.on_nodes:
4708
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4709
      if not result:
4710
        raise errors.OpExecError("Complete failure from rpc call")
4711
      for node, node_result in result.items():
4712
        if not node_result:
4713
          raise errors.OpExecError("Failure during rpc call to node %s,"
4714
                                   " result: %s" % (node, node_result))
4715

    
4716

    
4717
class IAllocator(object):
4718
  """IAllocator framework.
4719

4720
  An IAllocator instance has three sets of attributes:
4721
    - cfg/sstore that are needed to query the cluster
4722
    - input data (all members of the _KEYS class attribute are required)
4723
    - four buffer attributes (in|out_data|text), that represent the
4724
      input (to the external script) in text and data structure format,
4725
      and the output from it, again in two formats
4726
    - the result variables from the script (success, info, nodes) for
4727
      easy usage
4728

4729
  """
4730
  _KEYS = [
4731
    "mode", "name",
4732
    "mem_size", "disks", "disk_template",
4733
    "os", "tags", "nics", "vcpus",
4734
    ]
4735

    
4736
  def __init__(self, cfg, sstore, **kwargs):
4737
    self.cfg = cfg
4738
    self.sstore = sstore
4739
    # init buffer variables
4740
    self.in_text = self.out_text = self.in_data = self.out_data = None
4741
    # init all input fields so that pylint is happy
4742
    self.mode = self.name = None
4743
    self.mem_size = self.disks = self.disk_template = None
4744
    self.os = self.tags = self.nics = self.vcpus = None
4745
    # init result fields
4746
    self.success = self.info = self.nodes = None
4747
    for key in kwargs:
4748
      if key not in self._KEYS:
4749
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4750
                                     " IAllocator" % key)
4751
      setattr(self, key, kwargs[key])
4752
    for key in self._KEYS:
4753
      if key not in kwargs:
4754
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4755
                                     " IAllocator" % key)
4756
    self._BuildInputData()
4757

    
4758
  def _ComputeClusterData(self):
4759
    """Compute the generic allocator input data.
4760

4761
    This is the data that is independent of the actual operation.
4762

4763
    """
4764
    cfg = self.cfg
4765
    # cluster data
4766
    data = {
4767
      "version": 1,
4768
      "cluster_name": self.sstore.GetClusterName(),
4769
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4770
      # we don't have job IDs
4771
      }
4772

    
4773
    # node data
4774
    node_results = {}
4775
    node_list = cfg.GetNodeList()
4776
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4777
    for nname in node_list:
4778
      ninfo = cfg.GetNodeInfo(nname)
4779
      if nname not in node_data or not isinstance(node_data[nname], dict):
4780
        raise errors.OpExecError("Can't get data for node %s" % nname)
4781
      remote_info = node_data[nname]
4782
      for attr in ['memory_total', 'memory_free',
4783
                   'vg_size', 'vg_free']:
4784
        if attr not in remote_info:
4785
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4786
                                   (nname, attr))
4787
        try:
4788
          int(remote_info[attr])
4789
        except ValueError, err:
4790
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4791
                                   " %s" % (nname, attr, str(err)))
4792
      pnr = {
4793
        "tags": list(ninfo.GetTags()),
4794
        "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4795
        "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4796
        "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4797
        "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4798
        "primary_ip": ninfo.primary_ip,
4799
        "secondary_ip": ninfo.secondary_ip,
4800
        }
4801
      node_results[nname] = pnr
4802
    data["nodes"] = node_results
4803

    
4804
    # instance data
4805
    instance_data = {}
4806
    i_list = cfg.GetInstanceList()
4807
    for iname in i_list:
4808
      iinfo = cfg.GetInstanceInfo(iname)
4809
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4810
                  for n in iinfo.nics]
4811
      pir = {
4812
        "tags": list(iinfo.GetTags()),
4813
        "should_run": iinfo.status == "up",
4814
        "vcpus": iinfo.vcpus,
4815
        "memory": iinfo.memory,
4816
        "os": iinfo.os,
4817
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4818
        "nics": nic_data,
4819
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4820
        "disk_template": iinfo.disk_template,
4821
        }
4822
      instance_data[iname] = pir
4823

    
4824
    data["instances"] = instance_data
4825

    
4826
    self.in_data = data
4827

    
4828
  def _AddNewInstance(self):
4829
    """Add new instance data to allocator structure.
4830

4831
    This in combination with _AllocatorGetClusterData will create the
4832
    correct structure needed as input for the allocator.
4833

4834
    The checks for the completeness of the opcode must have already been
4835
    done.
4836

4837
    """
4838
    data = self.in_data
4839
    if len(self.disks) != 2:
4840
      raise errors.OpExecError("Only two-disk configurations supported")
4841

    
4842
    disk_space = _ComputeDiskSize(self.disk_template,
4843
                                  self.disks[0]["size"], self.disks[1]["size"])
4844

    
4845
    request = {
4846
      "type": "allocate",
4847
      "name": self.name,
4848
      "disk_template": self.disk_template,
4849
      "tags": self.tags,
4850
      "os": self.os,
4851
      "vcpus": self.vcpus,
4852
      "memory": self.mem_size,
4853
      "disks": self.disks,
4854
      "disk_space_total": disk_space,
4855
      "nics": self.nics,
4856
      }
4857
    data["request"] = request
4858

    
4859
  def _AddRelocateInstance(self):
4860
    """Add relocate instance data to allocator structure.
4861

4862
    This in combination with _IAllocatorGetClusterData will create the
4863
    correct structure needed as input for the allocator.
4864

4865
    The checks for the completeness of the opcode must have already been
4866
    done.
4867

4868
    """
4869
    data = self.in_data
4870
    request = {
4871
      "type": "replace_secondary",
4872
      "name": self.name,
4873
      }
4874
    data["request"] = request
4875

    
4876
  def _BuildInputData(self):
4877
    """Build input data structures.
4878

4879
    """
4880
    self._ComputeClusterData()
4881

    
4882
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4883
      self._AddNewInstance()
4884
    else:
4885
      self._AddRelocateInstance()
4886

    
4887
    self.in_text = serializer.Dump(self.in_data)
4888

    
4889
  def Run(self, name, validate=True):
4890
    """Run an instance allocator and return the results.
4891

4892
    """
4893
    data = self.in_text
4894

    
4895
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4896
                                  os.path.isfile)
4897
    if alloc_script is None:
4898
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4899

    
4900
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4901
    try:
4902
      os.write(fd, data)
4903
      os.close(fd)
4904
      result = utils.RunCmd([alloc_script, fin_name])
4905
      if result.failed:
4906
        raise errors.OpExecError("Instance allocator call failed: %s,"
4907
                                 " output: %s" %
4908
                                 (result.fail_reason, result.stdout))
4909
    finally:
4910
      os.unlink(fin_name)
4911
    self.out_text = result.stdout
4912
    if validate:
4913
      self._ValidateResult()
4914

    
4915
  def _ValidateResult(self):
4916
    """Process the allocator results.
4917

4918
    This will process and if successful save the result in
4919
    self.out_data and the other parameters.
4920

4921
    """
4922
    try:
4923
      rdict = serializer.Load(self.out_text)
4924
    except Exception, err:
4925
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4926

    
4927
    if not isinstance(rdict, dict):
4928
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4929

    
4930
    for key in "success", "info", "nodes":
4931
      if key not in rdict:
4932
        raise errors.OpExecError("Can't parse iallocator results:"
4933
                                 " missing key '%s'" % key)
4934
      setattr(self, key, rdict[key])
4935

    
4936
    if not isinstance(rdict["nodes"], list):
4937
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4938
                               " is not a list")
4939
    self.out_data = rdict
4940

    
4941

    
4942
class LUTestAllocator(NoHooksLU):
4943
  """Run allocator tests.
4944

4945
  This LU runs the allocator tests
4946

4947
  """
4948
  _OP_REQP = ["direction", "mode", "name"]
4949

    
4950
  def CheckPrereq(self):
4951
    """Check prerequisites.
4952

4953
    This checks the opcode parameters depending on the director and mode test.
4954

4955
    """
4956
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4957
      for attr in ["name", "mem_size", "disks", "disk_template",
4958
                   "os", "tags", "nics", "vcpus"]:
4959
        if not hasattr(self.op, attr):
4960
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4961
                                     attr)
4962
      iname = self.cfg.ExpandInstanceName(self.op.name)
4963
      if iname is not None:
4964
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4965
                                   iname)
4966
      if not isinstance(self.op.nics, list):
4967
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4968
      for row in self.op.nics:
4969
        if (not isinstance(row, dict) or
4970
            "mac" not in row or
4971
            "ip" not in row or
4972
            "bridge" not in row):
4973
          raise errors.OpPrereqError("Invalid contents of the"
4974
                                     " 'nics' parameter")
4975
      if not isinstance(self.op.disks, list):
4976
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4977
      if len(self.op.disks) != 2:
4978
        raise errors.OpPrereqError("Only two-disk configurations supported")
4979
      for row in self.op.disks:
4980
        if (not isinstance(row, dict) or
4981
            "size" not in row or
4982
            not isinstance(row["size"], int) or
4983
            "mode" not in row or
4984
            row["mode"] not in ['r', 'w']):
4985
          raise errors.OpPrereqError("Invalid contents of the"
4986
                                     " 'disks' parameter")
4987
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4988
      if not hasattr(self.op, "name"):
4989
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4990
      fname = self.cfg.ExpandInstanceName(self.op.name)
4991
      if fname is None:
4992
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4993
                                   self.op.name)
4994
      self.op.name = fname
4995
    else:
4996
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4997
                                 self.op.mode)
4998

    
4999
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5000
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5001
        raise errors.OpPrereqError("Missing allocator name")
5002
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5003
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5004
                                 self.op.direction)
5005

    
5006
  def Exec(self, feedback_fn):
5007
    """Run the allocator test.
5008

5009
    """
5010
    ial = IAllocator(self.cfg, self.sstore,
5011
                     mode=self.op.mode,
5012
                     name=self.op.name,
5013
                     mem_size=self.op.mem_size,
5014
                     disks=self.op.disks,
5015
                     disk_template=self.op.disk_template,
5016
                     os=self.op.os,
5017
                     tags=self.op.tags,
5018
                     nics=self.op.nics,
5019
                     vcpus=self.op.vcpus,
5020
                     )
5021

    
5022
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5023
      result = ial.in_text
5024
    else:
5025
      ial.Run(self.op.allocator, validate=False)
5026
      result = ial.out_text
5027
    return result