Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 8a3fe350

History | View | Annotate | Download (173.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156

    
157
class NoHooksLU(LogicalUnit):
158
  """Simple LU which runs no hooks.
159

160
  This LU is intended as a parent for other LogicalUnits which will
161
  run no hooks, in order to reduce duplicate code.
162

163
  """
164
  HPATH = None
165
  HTYPE = None
166

    
167
  def BuildHooksEnv(self):
168
    """Build hooks env.
169

170
    This is a no-op, since we don't run hooks.
171

172
    """
173
    return {}, [], []
174

    
175

    
176
def _AddHostToEtcHosts(hostname):
177
  """Wrapper around utils.SetEtcHostsEntry.
178

179
  """
180
  hi = utils.HostInfo(name=hostname)
181
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182

    
183

    
184
def _RemoveHostFromEtcHosts(hostname):
185
  """Wrapper around utils.RemoveEtcHostsEntry.
186

187
  """
188
  hi = utils.HostInfo(name=hostname)
189
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
190
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191

    
192

    
193
def _GetWantedNodes(lu, nodes):
194
  """Returns list of checked and expanded node names.
195

196
  Args:
197
    nodes: List of nodes (strings) or None for all
198

199
  """
200
  if not isinstance(nodes, list):
201
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
202

    
203
  if nodes:
204
    wanted = []
205

    
206
    for name in nodes:
207
      node = lu.cfg.ExpandNodeName(name)
208
      if node is None:
209
        raise errors.OpPrereqError("No such node name '%s'" % name)
210
      wanted.append(node)
211

    
212
  else:
213
    wanted = lu.cfg.GetNodeList()
214
  return utils.NiceSort(wanted)
215

    
216

    
217
def _GetWantedInstances(lu, instances):
218
  """Returns list of checked and expanded instance names.
219

220
  Args:
221
    instances: List of instances (strings) or None for all
222

223
  """
224
  if not isinstance(instances, list):
225
    raise errors.OpPrereqError("Invalid argument type 'instances'")
226

    
227
  if instances:
228
    wanted = []
229

    
230
    for name in instances:
231
      instance = lu.cfg.ExpandInstanceName(name)
232
      if instance is None:
233
        raise errors.OpPrereqError("No such instance name '%s'" % name)
234
      wanted.append(instance)
235

    
236
  else:
237
    wanted = lu.cfg.GetInstanceList()
238
  return utils.NiceSort(wanted)
239

    
240

    
241
def _CheckOutputFields(static, dynamic, selected):
242
  """Checks whether all selected fields are valid.
243

244
  Args:
245
    static: Static fields
246
    dynamic: Dynamic fields
247

248
  """
249
  static_fields = frozenset(static)
250
  dynamic_fields = frozenset(dynamic)
251

    
252
  all_fields = static_fields | dynamic_fields
253

    
254
  if not all_fields.issuperset(selected):
255
    raise errors.OpPrereqError("Unknown output fields selected: %s"
256
                               % ",".join(frozenset(selected).
257
                                          difference(all_fields)))
258

    
259

    
260
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261
                          memory, vcpus, nics):
262
  """Builds instance related env variables for hooks from single variables.
263

264
  Args:
265
    secondary_nodes: List of secondary nodes as strings
266
  """
267
  env = {
268
    "OP_TARGET": name,
269
    "INSTANCE_NAME": name,
270
    "INSTANCE_PRIMARY": primary_node,
271
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272
    "INSTANCE_OS_TYPE": os_type,
273
    "INSTANCE_STATUS": status,
274
    "INSTANCE_MEMORY": memory,
275
    "INSTANCE_VCPUS": vcpus,
276
  }
277

    
278
  if nics:
279
    nic_count = len(nics)
280
    for idx, (ip, bridge, mac) in enumerate(nics):
281
      if ip is None:
282
        ip = ""
283
      env["INSTANCE_NIC%d_IP" % idx] = ip
284
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
286
  else:
287
    nic_count = 0
288

    
289
  env["INSTANCE_NIC_COUNT"] = nic_count
290

    
291
  return env
292

    
293

    
294
def _BuildInstanceHookEnvByObject(instance, override=None):
295
  """Builds instance related env variables for hooks from an object.
296

297
  Args:
298
    instance: objects.Instance object of instance
299
    override: dict of values to override
300
  """
301
  args = {
302
    'name': instance.name,
303
    'primary_node': instance.primary_node,
304
    'secondary_nodes': instance.secondary_nodes,
305
    'os_type': instance.os,
306
    'status': instance.os,
307
    'memory': instance.memory,
308
    'vcpus': instance.vcpus,
309
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310
  }
311
  if override:
312
    args.update(override)
313
  return _BuildInstanceHookEnv(**args)
314

    
315

    
316
def _HasValidVG(vglist, vgname):
317
  """Checks if the volume group list is valid.
318

319
  A non-None return value means there's an error, and the return value
320
  is the error message.
321

322
  """
323
  vgsize = vglist.get(vgname, None)
324
  if vgsize is None:
325
    return "volume group '%s' missing" % vgname
326
  elif vgsize < 20480:
327
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
328
            (vgname, vgsize))
329
  return None
330

    
331

    
332
def _InitSSHSetup(node):
333
  """Setup the SSH configuration for the cluster.
334

335

336
  This generates a dsa keypair for root, adds the pub key to the
337
  permitted hosts and adds the hostkey to its own known hosts.
338

339
  Args:
340
    node: the name of this host as a fqdn
341

342
  """
343
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
344

    
345
  for name in priv_key, pub_key:
346
    if os.path.exists(name):
347
      utils.CreateBackup(name)
348
    utils.RemoveFile(name)
349

    
350
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
351
                         "-f", priv_key,
352
                         "-q", "-N", ""])
353
  if result.failed:
354
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
355
                             result.output)
356

    
357
  f = open(pub_key, 'r')
358
  try:
359
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
360
  finally:
361
    f.close()
362

    
363

    
364
def _InitGanetiServerSetup(ss):
365
  """Setup the necessary configuration for the initial node daemon.
366

367
  This creates the nodepass file containing the shared password for
368
  the cluster and also generates the SSL certificate.
369

370
  """
371
  # Create pseudo random password
372
  randpass = sha.new(os.urandom(64)).hexdigest()
373
  # and write it into sstore
374
  ss.SetKey(ss.SS_NODED_PASS, randpass)
375

    
376
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
377
                         "-days", str(365*5), "-nodes", "-x509",
378
                         "-keyout", constants.SSL_CERT_FILE,
379
                         "-out", constants.SSL_CERT_FILE, "-batch"])
380
  if result.failed:
381
    raise errors.OpExecError("could not generate server ssl cert, command"
382
                             " %s had exitcode %s and error message %s" %
383
                             (result.cmd, result.exit_code, result.output))
384

    
385
  os.chmod(constants.SSL_CERT_FILE, 0400)
386

    
387
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
388

    
389
  if result.failed:
390
    raise errors.OpExecError("Could not start the node daemon, command %s"
391
                             " had exitcode %s and error %s" %
392
                             (result.cmd, result.exit_code, result.output))
393

    
394

    
395
def _CheckInstanceBridgesExist(instance):
396
  """Check that the brigdes needed by an instance exist.
397

398
  """
399
  # check bridges existance
400
  brlist = [nic.bridge for nic in instance.nics]
401
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
402
    raise errors.OpPrereqError("one or more target bridges %s does not"
403
                               " exist on destination node '%s'" %
404
                               (brlist, instance.primary_node))
405

    
406

    
407
class LUInitCluster(LogicalUnit):
408
  """Initialise the cluster.
409

410
  """
411
  HPATH = "cluster-init"
412
  HTYPE = constants.HTYPE_CLUSTER
413
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
414
              "def_bridge", "master_netdev", "file_storage_dir"]
415
  REQ_CLUSTER = False
416

    
417
  def BuildHooksEnv(self):
418
    """Build hooks env.
419

420
    Notes: Since we don't require a cluster, we must manually add
421
    ourselves in the post-run node list.
422

423
    """
424
    env = {"OP_TARGET": self.op.cluster_name}
425
    return env, [], [self.hostname.name]
426

    
427
  def CheckPrereq(self):
428
    """Verify that the passed name is a valid one.
429

430
    """
431
    if config.ConfigWriter.IsCluster():
432
      raise errors.OpPrereqError("Cluster is already initialised")
433

    
434
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
435
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
436
        raise errors.OpPrereqError("Please prepare the cluster VNC"
437
                                   "password file %s" %
438
                                   constants.VNC_PASSWORD_FILE)
439

    
440
    self.hostname = hostname = utils.HostInfo()
441

    
442
    if hostname.ip.startswith("127."):
443
      raise errors.OpPrereqError("This host's IP resolves to the private"
444
                                 " range (%s). Please fix DNS or %s." %
445
                                 (hostname.ip, constants.ETC_HOSTS))
446

    
447
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
448
                         source=constants.LOCALHOST_IP_ADDRESS):
449
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
450
                                 " to %s,\nbut this ip address does not"
451
                                 " belong to this host."
452
                                 " Aborting." % hostname.ip)
453

    
454
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
455

    
456
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
457
                     timeout=5):
458
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
459

    
460
    secondary_ip = getattr(self.op, "secondary_ip", None)
461
    if secondary_ip and not utils.IsValidIP(secondary_ip):
462
      raise errors.OpPrereqError("Invalid secondary ip given")
463
    if (secondary_ip and
464
        secondary_ip != hostname.ip and
465
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
466
                           source=constants.LOCALHOST_IP_ADDRESS))):
467
      raise errors.OpPrereqError("You gave %s as secondary IP,"
468
                                 " but it does not belong to this host." %
469
                                 secondary_ip)
470
    self.secondary_ip = secondary_ip
471

    
472
    if not hasattr(self.op, "vg_name"):
473
      self.op.vg_name = None
474
    # if vg_name not None, checks if volume group is valid
475
    if self.op.vg_name:
476
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
477
      if vgstatus:
478
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
479
                                   " you are not using lvm" % vgstatus)
480

    
481
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
482

    
483
    if not os.path.isabs(self.op.file_storage_dir):
484
      raise errors.OpPrereqError("The file storage directory you have is"
485
                                 " not an absolute path.")
486

    
487
    if not os.path.exists(self.op.file_storage_dir):
488
      try:
489
        os.makedirs(self.op.file_storage_dir, 0750)
490
      except OSError, err:
491
        raise errors.OpPrereqError("Cannot create file storage directory"
492
                                   " '%s': %s" %
493
                                   (self.op.file_storage_dir, err))
494

    
495
    if not os.path.isdir(self.op.file_storage_dir):
496
      raise errors.OpPrereqError("The file storage directory '%s' is not"
497
                                 " a directory." % self.op.file_storage_dir)
498

    
499
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
500
                    self.op.mac_prefix):
501
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
502
                                 self.op.mac_prefix)
503

    
504
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
505
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
506
                                 self.op.hypervisor_type)
507

    
508
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
509
    if result.failed:
510
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
511
                                 (self.op.master_netdev,
512
                                  result.output.strip()))
513

    
514
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
515
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
516
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
517
                                 " executable." % constants.NODE_INITD_SCRIPT)
518

    
519
  def Exec(self, feedback_fn):
520
    """Initialize the cluster.
521

522
    """
523
    clustername = self.clustername
524
    hostname = self.hostname
525

    
526
    # set up the simple store
527
    self.sstore = ss = ssconf.SimpleStore()
528
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
529
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
530
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
531
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
532
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
533
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
534

    
535
    # set up the inter-node password and certificate
536
    _InitGanetiServerSetup(ss)
537

    
538
    # start the master ip
539
    rpc.call_node_start_master(hostname.name)
540

    
541
    # set up ssh config and /etc/hosts
542
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
543
    try:
544
      sshline = f.read()
545
    finally:
546
      f.close()
547
    sshkey = sshline.split(" ")[1]
548

    
549
    _AddHostToEtcHosts(hostname.name)
550
    _InitSSHSetup(hostname.name)
551

    
552
    # init of cluster config file
553
    self.cfg = cfgw = config.ConfigWriter()
554
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
555
                    sshkey, self.op.mac_prefix,
556
                    self.op.vg_name, self.op.def_bridge)
557

    
558
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
559

    
560

    
561
class LUDestroyCluster(NoHooksLU):
562
  """Logical unit for destroying the cluster.
563

564
  """
565
  _OP_REQP = []
566

    
567
  def CheckPrereq(self):
568
    """Check prerequisites.
569

570
    This checks whether the cluster is empty.
571

572
    Any errors are signalled by raising errors.OpPrereqError.
573

574
    """
575
    master = self.sstore.GetMasterNode()
576

    
577
    nodelist = self.cfg.GetNodeList()
578
    if len(nodelist) != 1 or nodelist[0] != master:
579
      raise errors.OpPrereqError("There are still %d node(s) in"
580
                                 " this cluster." % (len(nodelist) - 1))
581
    instancelist = self.cfg.GetInstanceList()
582
    if instancelist:
583
      raise errors.OpPrereqError("There are still %d instance(s) in"
584
                                 " this cluster." % len(instancelist))
585

    
586
  def Exec(self, feedback_fn):
587
    """Destroys the cluster.
588

589
    """
590
    master = self.sstore.GetMasterNode()
591
    if not rpc.call_node_stop_master(master):
592
      raise errors.OpExecError("Could not disable the master role")
593
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594
    utils.CreateBackup(priv_key)
595
    utils.CreateBackup(pub_key)
596
    rpc.call_node_leave_cluster(master)
597

    
598

    
599
class LUVerifyCluster(NoHooksLU):
600
  """Verifies the cluster status.
601

602
  """
603
  _OP_REQP = ["skip_checks"]
604

    
605
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
606
                  remote_version, feedback_fn):
607
    """Run multiple tests against a node.
608

609
    Test list:
610
      - compares ganeti version
611
      - checks vg existance and size > 20G
612
      - checks config file checksum
613
      - checks ssh to other nodes
614

615
    Args:
616
      node: name of the node to check
617
      file_list: required list of files
618
      local_cksum: dictionary of local files and their checksums
619

620
    """
621
    # compares ganeti version
622
    local_version = constants.PROTOCOL_VERSION
623
    if not remote_version:
624
      feedback_fn("  - ERROR: connection to %s failed" % (node))
625
      return True
626

    
627
    if local_version != remote_version:
628
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
629
                      (local_version, node, remote_version))
630
      return True
631

    
632
    # checks vg existance and size > 20G
633

    
634
    bad = False
635
    if not vglist:
636
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
637
                      (node,))
638
      bad = True
639
    else:
640
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
641
      if vgstatus:
642
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
643
        bad = True
644

    
645
    # checks config file checksum
646
    # checks ssh to any
647

    
648
    if 'filelist' not in node_result:
649
      bad = True
650
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
651
    else:
652
      remote_cksum = node_result['filelist']
653
      for file_name in file_list:
654
        if file_name not in remote_cksum:
655
          bad = True
656
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
657
        elif remote_cksum[file_name] != local_cksum[file_name]:
658
          bad = True
659
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
660

    
661
    if 'nodelist' not in node_result:
662
      bad = True
663
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
664
    else:
665
      if node_result['nodelist']:
666
        bad = True
667
        for node in node_result['nodelist']:
668
          feedback_fn("  - ERROR: communication with node '%s': %s" %
669
                          (node, node_result['nodelist'][node]))
670
    hyp_result = node_result.get('hypervisor', None)
671
    if hyp_result is not None:
672
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
673
    return bad
674

    
675
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
676
                      node_instance, feedback_fn):
677
    """Verify an instance.
678

679
    This function checks to see if the required block devices are
680
    available on the instance's node.
681

682
    """
683
    bad = False
684

    
685
    node_current = instanceconfig.primary_node
686

    
687
    node_vol_should = {}
688
    instanceconfig.MapLVsByNode(node_vol_should)
689

    
690
    for node in node_vol_should:
691
      for volume in node_vol_should[node]:
692
        if node not in node_vol_is or volume not in node_vol_is[node]:
693
          feedback_fn("  - ERROR: volume %s missing on node %s" %
694
                          (volume, node))
695
          bad = True
696

    
697
    if not instanceconfig.status == 'down':
698
      if (node_current not in node_instance or
699
          not instance in node_instance[node_current]):
700
        feedback_fn("  - ERROR: instance %s not running on node %s" %
701
                        (instance, node_current))
702
        bad = True
703

    
704
    for node in node_instance:
705
      if (not node == node_current):
706
        if instance in node_instance[node]:
707
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
708
                          (instance, node))
709
          bad = True
710

    
711
    return bad
712

    
713
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
714
    """Verify if there are any unknown volumes in the cluster.
715

716
    The .os, .swap and backup volumes are ignored. All other volumes are
717
    reported as unknown.
718

719
    """
720
    bad = False
721

    
722
    for node in node_vol_is:
723
      for volume in node_vol_is[node]:
724
        if node not in node_vol_should or volume not in node_vol_should[node]:
725
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
726
                      (volume, node))
727
          bad = True
728
    return bad
729

    
730
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
731
    """Verify the list of running instances.
732

733
    This checks what instances are running but unknown to the cluster.
734

735
    """
736
    bad = False
737
    for node in node_instance:
738
      for runninginstance in node_instance[node]:
739
        if runninginstance not in instancelist:
740
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
741
                          (runninginstance, node))
742
          bad = True
743
    return bad
744

    
745
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
746
    """Verify N+1 Memory Resilience.
747

748
    Check that if one single node dies we can still start all the instances it
749
    was primary for.
750

751
    """
752
    bad = False
753

    
754
    for node, nodeinfo in node_info.iteritems():
755
      # This code checks that every node which is now listed as secondary has
756
      # enough memory to host all instances it is supposed to should a single
757
      # other node in the cluster fail.
758
      # FIXME: not ready for failover to an arbitrary node
759
      # FIXME: does not support file-backed instances
760
      # WARNING: we currently take into account down instances as well as up
761
      # ones, considering that even if they're down someone might want to start
762
      # them even in the event of a node failure.
763
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
764
        needed_mem = 0
765
        for instance in instances:
766
          needed_mem += instance_cfg[instance].memory
767
        if nodeinfo['mfree'] < needed_mem:
768
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
769
                      " failovers should node %s fail" % (node, prinode))
770
          bad = True
771
    return bad
772

    
773
  def CheckPrereq(self):
774
    """Check prerequisites.
775

776
    Transform the list of checks we're going to skip into a set and check that
777
    all its members are valid.
778

779
    """
780
    self.skip_set = frozenset(self.op.skip_checks)
781
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
782
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
783

    
784
  def Exec(self, feedback_fn):
785
    """Verify integrity of cluster, performing various test on nodes.
786

787
    """
788
    bad = False
789
    feedback_fn("* Verifying global settings")
790
    for msg in self.cfg.VerifyConfig():
791
      feedback_fn("  - ERROR: %s" % msg)
792

    
793
    vg_name = self.cfg.GetVGName()
794
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
795
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
796
    i_non_redundant = [] # Non redundant instances
797
    node_volume = {}
798
    node_instance = {}
799
    node_info = {}
800
    instance_cfg = {}
801

    
802
    # FIXME: verify OS list
803
    # do local checksums
804
    file_names = list(self.sstore.GetFileList())
805
    file_names.append(constants.SSL_CERT_FILE)
806
    file_names.append(constants.CLUSTER_CONF_FILE)
807
    local_checksums = utils.FingerprintFiles(file_names)
808

    
809
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
810
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
811
    all_instanceinfo = rpc.call_instance_list(nodelist)
812
    all_vglist = rpc.call_vg_list(nodelist)
813
    node_verify_param = {
814
      'filelist': file_names,
815
      'nodelist': nodelist,
816
      'hypervisor': None,
817
      }
818
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
819
    all_rversion = rpc.call_version(nodelist)
820
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
821

    
822
    for node in nodelist:
823
      feedback_fn("* Verifying node %s" % node)
824
      result = self._VerifyNode(node, file_names, local_checksums,
825
                                all_vglist[node], all_nvinfo[node],
826
                                all_rversion[node], feedback_fn)
827
      bad = bad or result
828

    
829
      # node_volume
830
      volumeinfo = all_volumeinfo[node]
831

    
832
      if isinstance(volumeinfo, basestring):
833
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
834
                    (node, volumeinfo[-400:].encode('string_escape')))
835
        bad = True
836
        node_volume[node] = {}
837
      elif not isinstance(volumeinfo, dict):
838
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
839
        bad = True
840
        continue
841
      else:
842
        node_volume[node] = volumeinfo
843

    
844
      # node_instance
845
      nodeinstance = all_instanceinfo[node]
846
      if type(nodeinstance) != list:
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850

    
851
      node_instance[node] = nodeinstance
852

    
853
      # node_info
854
      nodeinfo = all_ninfo[node]
855
      if not isinstance(nodeinfo, dict):
856
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
857
        bad = True
858
        continue
859

    
860
      try:
861
        node_info[node] = {
862
          "mfree": int(nodeinfo['memory_free']),
863
          "dfree": int(nodeinfo['vg_free']),
864
          "pinst": [],
865
          "sinst": [],
866
          # dictionary holding all instances this node is secondary for,
867
          # grouped by their primary node. Each key is a cluster node, and each
868
          # value is a list of instances which have the key as primary and the
869
          # current node as secondary.  this is handy to calculate N+1 memory
870
          # availability if you can only failover from a primary to its
871
          # secondary.
872
          "sinst-by-pnode": {},
873
        }
874
      except ValueError:
875
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
876
        bad = True
877
        continue
878

    
879
    node_vol_should = {}
880

    
881
    for instance in instancelist:
882
      feedback_fn("* Verifying instance %s" % instance)
883
      inst_config = self.cfg.GetInstanceInfo(instance)
884
      result =  self._VerifyInstance(instance, inst_config, node_volume,
885
                                     node_instance, feedback_fn)
886
      bad = bad or result
887

    
888
      inst_config.MapLVsByNode(node_vol_should)
889

    
890
      instance_cfg[instance] = inst_config
891

    
892
      pnode = inst_config.primary_node
893
      if pnode in node_info:
894
        node_info[pnode]['pinst'].append(instance)
895
      else:
896
        feedback_fn("  - ERROR: instance %s, connection to primary node"
897
                    " %s failed" % (instance, pnode))
898
        bad = True
899

    
900
      # If the instance is non-redundant we cannot survive losing its primary
901
      # node, so we are not N+1 compliant. On the other hand we have no disk
902
      # templates with more than one secondary so that situation is not well
903
      # supported either.
904
      # FIXME: does not support file-backed instances
905
      if len(inst_config.secondary_nodes) == 0:
906
        i_non_redundant.append(instance)
907
      elif len(inst_config.secondary_nodes) > 1:
908
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
909
                    % instance)
910

    
911
      for snode in inst_config.secondary_nodes:
912
        if snode in node_info:
913
          node_info[snode]['sinst'].append(instance)
914
          if pnode not in node_info[snode]['sinst-by-pnode']:
915
            node_info[snode]['sinst-by-pnode'][pnode] = []
916
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
917
        else:
918
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
919
                      " %s failed" % (instance, snode))
920

    
921
    feedback_fn("* Verifying orphan volumes")
922
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
923
                                       feedback_fn)
924
    bad = bad or result
925

    
926
    feedback_fn("* Verifying remaining instances")
927
    result = self._VerifyOrphanInstances(instancelist, node_instance,
928
                                         feedback_fn)
929
    bad = bad or result
930

    
931
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
932
      feedback_fn("* Verifying N+1 Memory redundancy")
933
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
934
      bad = bad or result
935

    
936
    feedback_fn("* Other Notes")
937
    if i_non_redundant:
938
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
939
                  % len(i_non_redundant))
940

    
941
    return int(bad)
942

    
943

    
944
class LUVerifyDisks(NoHooksLU):
945
  """Verifies the cluster disks status.
946

947
  """
948
  _OP_REQP = []
949

    
950
  def CheckPrereq(self):
951
    """Check prerequisites.
952

953
    This has no prerequisites.
954

955
    """
956
    pass
957

    
958
  def Exec(self, feedback_fn):
959
    """Verify integrity of cluster disks.
960

961
    """
962
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
963

    
964
    vg_name = self.cfg.GetVGName()
965
    nodes = utils.NiceSort(self.cfg.GetNodeList())
966
    instances = [self.cfg.GetInstanceInfo(name)
967
                 for name in self.cfg.GetInstanceList()]
968

    
969
    nv_dict = {}
970
    for inst in instances:
971
      inst_lvs = {}
972
      if (inst.status != "up" or
973
          inst.disk_template not in constants.DTS_NET_MIRROR):
974
        continue
975
      inst.MapLVsByNode(inst_lvs)
976
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
977
      for node, vol_list in inst_lvs.iteritems():
978
        for vol in vol_list:
979
          nv_dict[(node, vol)] = inst
980

    
981
    if not nv_dict:
982
      return result
983

    
984
    node_lvs = rpc.call_volume_list(nodes, vg_name)
985

    
986
    to_act = set()
987
    for node in nodes:
988
      # node_volume
989
      lvs = node_lvs[node]
990

    
991
      if isinstance(lvs, basestring):
992
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
993
        res_nlvm[node] = lvs
994
      elif not isinstance(lvs, dict):
995
        logger.Info("connection to node %s failed or invalid data returned" %
996
                    (node,))
997
        res_nodes.append(node)
998
        continue
999

    
1000
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1001
        inst = nv_dict.pop((node, lv_name), None)
1002
        if (not lv_online and inst is not None
1003
            and inst.name not in res_instances):
1004
          res_instances.append(inst.name)
1005

    
1006
    # any leftover items in nv_dict are missing LVs, let's arrange the
1007
    # data better
1008
    for key, inst in nv_dict.iteritems():
1009
      if inst.name not in res_missing:
1010
        res_missing[inst.name] = []
1011
      res_missing[inst.name].append(key)
1012

    
1013
    return result
1014

    
1015

    
1016
class LURenameCluster(LogicalUnit):
1017
  """Rename the cluster.
1018

1019
  """
1020
  HPATH = "cluster-rename"
1021
  HTYPE = constants.HTYPE_CLUSTER
1022
  _OP_REQP = ["name"]
1023

    
1024
  def BuildHooksEnv(self):
1025
    """Build hooks env.
1026

1027
    """
1028
    env = {
1029
      "OP_TARGET": self.sstore.GetClusterName(),
1030
      "NEW_NAME": self.op.name,
1031
      }
1032
    mn = self.sstore.GetMasterNode()
1033
    return env, [mn], [mn]
1034

    
1035
  def CheckPrereq(self):
1036
    """Verify that the passed name is a valid one.
1037

1038
    """
1039
    hostname = utils.HostInfo(self.op.name)
1040

    
1041
    new_name = hostname.name
1042
    self.ip = new_ip = hostname.ip
1043
    old_name = self.sstore.GetClusterName()
1044
    old_ip = self.sstore.GetMasterIP()
1045
    if new_name == old_name and new_ip == old_ip:
1046
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1047
                                 " cluster has changed")
1048
    if new_ip != old_ip:
1049
      result = utils.RunCmd(["fping", "-q", new_ip])
1050
      if not result.failed:
1051
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1052
                                   " reachable on the network. Aborting." %
1053
                                   new_ip)
1054

    
1055
    self.op.name = new_name
1056

    
1057
  def Exec(self, feedback_fn):
1058
    """Rename the cluster.
1059

1060
    """
1061
    clustername = self.op.name
1062
    ip = self.ip
1063
    ss = self.sstore
1064

    
1065
    # shutdown the master IP
1066
    master = ss.GetMasterNode()
1067
    if not rpc.call_node_stop_master(master):
1068
      raise errors.OpExecError("Could not disable the master role")
1069

    
1070
    try:
1071
      # modify the sstore
1072
      ss.SetKey(ss.SS_MASTER_IP, ip)
1073
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1074

    
1075
      # Distribute updated ss config to all nodes
1076
      myself = self.cfg.GetNodeInfo(master)
1077
      dist_nodes = self.cfg.GetNodeList()
1078
      if myself.name in dist_nodes:
1079
        dist_nodes.remove(myself.name)
1080

    
1081
      logger.Debug("Copying updated ssconf data to all nodes")
1082
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1083
        fname = ss.KeyToFilename(keyname)
1084
        result = rpc.call_upload_file(dist_nodes, fname)
1085
        for to_node in dist_nodes:
1086
          if not result[to_node]:
1087
            logger.Error("copy of file %s to node %s failed" %
1088
                         (fname, to_node))
1089
    finally:
1090
      if not rpc.call_node_start_master(master):
1091
        logger.Error("Could not re-enable the master role on the master,"
1092
                     " please restart manually.")
1093

    
1094

    
1095
def _RecursiveCheckIfLVMBased(disk):
1096
  """Check if the given disk or its children are lvm-based.
1097

1098
  Args:
1099
    disk: ganeti.objects.Disk object
1100

1101
  Returns:
1102
    boolean indicating whether a LD_LV dev_type was found or not
1103

1104
  """
1105
  if disk.children:
1106
    for chdisk in disk.children:
1107
      if _RecursiveCheckIfLVMBased(chdisk):
1108
        return True
1109
  return disk.dev_type == constants.LD_LV
1110

    
1111

    
1112
class LUSetClusterParams(LogicalUnit):
1113
  """Change the parameters of the cluster.
1114

1115
  """
1116
  HPATH = "cluster-modify"
1117
  HTYPE = constants.HTYPE_CLUSTER
1118
  _OP_REQP = []
1119

    
1120
  def BuildHooksEnv(self):
1121
    """Build hooks env.
1122

1123
    """
1124
    env = {
1125
      "OP_TARGET": self.sstore.GetClusterName(),
1126
      "NEW_VG_NAME": self.op.vg_name,
1127
      }
1128
    mn = self.sstore.GetMasterNode()
1129
    return env, [mn], [mn]
1130

    
1131
  def CheckPrereq(self):
1132
    """Check prerequisites.
1133

1134
    This checks whether the given params don't conflict and
1135
    if the given volume group is valid.
1136

1137
    """
1138
    if not self.op.vg_name:
1139
      instances = [self.cfg.GetInstanceInfo(name)
1140
                   for name in self.cfg.GetInstanceList()]
1141
      for inst in instances:
1142
        for disk in inst.disks:
1143
          if _RecursiveCheckIfLVMBased(disk):
1144
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1145
                                       " lvm-based instances exist")
1146

    
1147
    # if vg_name not None, checks given volume group on all nodes
1148
    if self.op.vg_name:
1149
      node_list = self.cfg.GetNodeList()
1150
      vglist = rpc.call_vg_list(node_list)
1151
      for node in node_list:
1152
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1153
        if vgstatus:
1154
          raise errors.OpPrereqError("Error on node '%s': %s" %
1155
                                     (node, vgstatus))
1156

    
1157
  def Exec(self, feedback_fn):
1158
    """Change the parameters of the cluster.
1159

1160
    """
1161
    if self.op.vg_name != self.cfg.GetVGName():
1162
      self.cfg.SetVGName(self.op.vg_name)
1163
    else:
1164
      feedback_fn("Cluster LVM configuration already in desired"
1165
                  " state, not changing")
1166

    
1167

    
1168
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1169
  """Sleep and poll for an instance's disk to sync.
1170

1171
  """
1172
  if not instance.disks:
1173
    return True
1174

    
1175
  if not oneshot:
1176
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1177

    
1178
  node = instance.primary_node
1179

    
1180
  for dev in instance.disks:
1181
    cfgw.SetDiskID(dev, node)
1182

    
1183
  retries = 0
1184
  while True:
1185
    max_time = 0
1186
    done = True
1187
    cumul_degraded = False
1188
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1189
    if not rstats:
1190
      proc.LogWarning("Can't get any data from node %s" % node)
1191
      retries += 1
1192
      if retries >= 10:
1193
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1194
                                 " aborting." % node)
1195
      time.sleep(6)
1196
      continue
1197
    retries = 0
1198
    for i in range(len(rstats)):
1199
      mstat = rstats[i]
1200
      if mstat is None:
1201
        proc.LogWarning("Can't compute data for node %s/%s" %
1202
                        (node, instance.disks[i].iv_name))
1203
        continue
1204
      # we ignore the ldisk parameter
1205
      perc_done, est_time, is_degraded, _ = mstat
1206
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1207
      if perc_done is not None:
1208
        done = False
1209
        if est_time is not None:
1210
          rem_time = "%d estimated seconds remaining" % est_time
1211
          max_time = est_time
1212
        else:
1213
          rem_time = "no time estimate"
1214
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1215
                     (instance.disks[i].iv_name, perc_done, rem_time))
1216
    if done or oneshot:
1217
      break
1218

    
1219
    if unlock:
1220
      #utils.Unlock('cmd')
1221
      pass
1222
    try:
1223
      time.sleep(min(60, max_time))
1224
    finally:
1225
      if unlock:
1226
        #utils.Lock('cmd')
1227
        pass
1228

    
1229
  if done:
1230
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1231
  return not cumul_degraded
1232

    
1233

    
1234
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1235
  """Check that mirrors are not degraded.
1236

1237
  The ldisk parameter, if True, will change the test from the
1238
  is_degraded attribute (which represents overall non-ok status for
1239
  the device(s)) to the ldisk (representing the local storage status).
1240

1241
  """
1242
  cfgw.SetDiskID(dev, node)
1243
  if ldisk:
1244
    idx = 6
1245
  else:
1246
    idx = 5
1247

    
1248
  result = True
1249
  if on_primary or dev.AssembleOnSecondary():
1250
    rstats = rpc.call_blockdev_find(node, dev)
1251
    if not rstats:
1252
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1253
      result = False
1254
    else:
1255
      result = result and (not rstats[idx])
1256
  if dev.children:
1257
    for child in dev.children:
1258
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1259

    
1260
  return result
1261

    
1262

    
1263
class LUDiagnoseOS(NoHooksLU):
1264
  """Logical unit for OS diagnose/query.
1265

1266
  """
1267
  _OP_REQP = ["output_fields", "names"]
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    This always succeeds, since this is a pure query LU.
1273

1274
    """
1275
    if self.op.names:
1276
      raise errors.OpPrereqError("Selective OS query not supported")
1277

    
1278
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1279
    _CheckOutputFields(static=[],
1280
                       dynamic=self.dynamic_fields,
1281
                       selected=self.op.output_fields)
1282

    
1283
  @staticmethod
1284
  def _DiagnoseByOS(node_list, rlist):
1285
    """Remaps a per-node return list into an a per-os per-node dictionary
1286

1287
      Args:
1288
        node_list: a list with the names of all nodes
1289
        rlist: a map with node names as keys and OS objects as values
1290

1291
      Returns:
1292
        map: a map with osnames as keys and as value another map, with
1293
             nodes as
1294
             keys and list of OS objects as values
1295
             e.g. {"debian-etch": {"node1": [<object>,...],
1296
                                   "node2": [<object>,]}
1297
                  }
1298

1299
    """
1300
    all_os = {}
1301
    for node_name, nr in rlist.iteritems():
1302
      if not nr:
1303
        continue
1304
      for os_obj in nr:
1305
        if os_obj.name not in all_os:
1306
          # build a list of nodes for this os containing empty lists
1307
          # for each node in node_list
1308
          all_os[os_obj.name] = {}
1309
          for nname in node_list:
1310
            all_os[os_obj.name][nname] = []
1311
        all_os[os_obj.name][node_name].append(os_obj)
1312
    return all_os
1313

    
1314
  def Exec(self, feedback_fn):
1315
    """Compute the list of OSes.
1316

1317
    """
1318
    node_list = self.cfg.GetNodeList()
1319
    node_data = rpc.call_os_diagnose(node_list)
1320
    if node_data == False:
1321
      raise errors.OpExecError("Can't gather the list of OSes")
1322
    pol = self._DiagnoseByOS(node_list, node_data)
1323
    output = []
1324
    for os_name, os_data in pol.iteritems():
1325
      row = []
1326
      for field in self.op.output_fields:
1327
        if field == "name":
1328
          val = os_name
1329
        elif field == "valid":
1330
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1331
        elif field == "node_status":
1332
          val = {}
1333
          for node_name, nos_list in os_data.iteritems():
1334
            val[node_name] = [(v.status, v.path) for v in nos_list]
1335
        else:
1336
          raise errors.ParameterError(field)
1337
        row.append(val)
1338
      output.append(row)
1339

    
1340
    return output
1341

    
1342

    
1343
class LURemoveNode(LogicalUnit):
1344
  """Logical unit for removing a node.
1345

1346
  """
1347
  HPATH = "node-remove"
1348
  HTYPE = constants.HTYPE_NODE
1349
  _OP_REQP = ["node_name"]
1350

    
1351
  def BuildHooksEnv(self):
1352
    """Build hooks env.
1353

1354
    This doesn't run on the target node in the pre phase as a failed
1355
    node would not allows itself to run.
1356

1357
    """
1358
    env = {
1359
      "OP_TARGET": self.op.node_name,
1360
      "NODE_NAME": self.op.node_name,
1361
      }
1362
    all_nodes = self.cfg.GetNodeList()
1363
    all_nodes.remove(self.op.node_name)
1364
    return env, all_nodes, all_nodes
1365

    
1366
  def CheckPrereq(self):
1367
    """Check prerequisites.
1368

1369
    This checks:
1370
     - the node exists in the configuration
1371
     - it does not have primary or secondary instances
1372
     - it's not the master
1373

1374
    Any errors are signalled by raising errors.OpPrereqError.
1375

1376
    """
1377
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1378
    if node is None:
1379
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1380

    
1381
    instance_list = self.cfg.GetInstanceList()
1382

    
1383
    masternode = self.sstore.GetMasterNode()
1384
    if node.name == masternode:
1385
      raise errors.OpPrereqError("Node is the master node,"
1386
                                 " you need to failover first.")
1387

    
1388
    for instance_name in instance_list:
1389
      instance = self.cfg.GetInstanceInfo(instance_name)
1390
      if node.name == instance.primary_node:
1391
        raise errors.OpPrereqError("Instance %s still running on the node,"
1392
                                   " please remove first." % instance_name)
1393
      if node.name in instance.secondary_nodes:
1394
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1395
                                   " please remove first." % instance_name)
1396
    self.op.node_name = node.name
1397
    self.node = node
1398

    
1399
  def Exec(self, feedback_fn):
1400
    """Removes the node from the cluster.
1401

1402
    """
1403
    node = self.node
1404
    logger.Info("stopping the node daemon and removing configs from node %s" %
1405
                node.name)
1406

    
1407
    rpc.call_node_leave_cluster(node.name)
1408

    
1409
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1410

    
1411
    logger.Info("Removing node %s from config" % node.name)
1412

    
1413
    self.cfg.RemoveNode(node.name)
1414

    
1415
    _RemoveHostFromEtcHosts(node.name)
1416

    
1417

    
1418
class LUQueryNodes(NoHooksLU):
1419
  """Logical unit for querying nodes.
1420

1421
  """
1422
  _OP_REQP = ["output_fields", "names"]
1423

    
1424
  def CheckPrereq(self):
1425
    """Check prerequisites.
1426

1427
    This checks that the fields required are valid output fields.
1428

1429
    """
1430
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1431
                                     "mtotal", "mnode", "mfree",
1432
                                     "bootid"])
1433

    
1434
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1435
                               "pinst_list", "sinst_list",
1436
                               "pip", "sip"],
1437
                       dynamic=self.dynamic_fields,
1438
                       selected=self.op.output_fields)
1439

    
1440
    self.wanted = _GetWantedNodes(self, self.op.names)
1441

    
1442
  def Exec(self, feedback_fn):
1443
    """Computes the list of nodes and their attributes.
1444

1445
    """
1446
    nodenames = self.wanted
1447
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1448

    
1449
    # begin data gathering
1450

    
1451
    if self.dynamic_fields.intersection(self.op.output_fields):
1452
      live_data = {}
1453
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1454
      for name in nodenames:
1455
        nodeinfo = node_data.get(name, None)
1456
        if nodeinfo:
1457
          live_data[name] = {
1458
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1459
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1460
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1461
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1462
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1463
            "bootid": nodeinfo['bootid'],
1464
            }
1465
        else:
1466
          live_data[name] = {}
1467
    else:
1468
      live_data = dict.fromkeys(nodenames, {})
1469

    
1470
    node_to_primary = dict([(name, set()) for name in nodenames])
1471
    node_to_secondary = dict([(name, set()) for name in nodenames])
1472

    
1473
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1474
                             "sinst_cnt", "sinst_list"))
1475
    if inst_fields & frozenset(self.op.output_fields):
1476
      instancelist = self.cfg.GetInstanceList()
1477

    
1478
      for instance_name in instancelist:
1479
        inst = self.cfg.GetInstanceInfo(instance_name)
1480
        if inst.primary_node in node_to_primary:
1481
          node_to_primary[inst.primary_node].add(inst.name)
1482
        for secnode in inst.secondary_nodes:
1483
          if secnode in node_to_secondary:
1484
            node_to_secondary[secnode].add(inst.name)
1485

    
1486
    # end data gathering
1487

    
1488
    output = []
1489
    for node in nodelist:
1490
      node_output = []
1491
      for field in self.op.output_fields:
1492
        if field == "name":
1493
          val = node.name
1494
        elif field == "pinst_list":
1495
          val = list(node_to_primary[node.name])
1496
        elif field == "sinst_list":
1497
          val = list(node_to_secondary[node.name])
1498
        elif field == "pinst_cnt":
1499
          val = len(node_to_primary[node.name])
1500
        elif field == "sinst_cnt":
1501
          val = len(node_to_secondary[node.name])
1502
        elif field == "pip":
1503
          val = node.primary_ip
1504
        elif field == "sip":
1505
          val = node.secondary_ip
1506
        elif field in self.dynamic_fields:
1507
          val = live_data[node.name].get(field, None)
1508
        else:
1509
          raise errors.ParameterError(field)
1510
        node_output.append(val)
1511
      output.append(node_output)
1512

    
1513
    return output
1514

    
1515

    
1516
class LUQueryNodeVolumes(NoHooksLU):
1517
  """Logical unit for getting volumes on node(s).
1518

1519
  """
1520
  _OP_REQP = ["nodes", "output_fields"]
1521

    
1522
  def CheckPrereq(self):
1523
    """Check prerequisites.
1524

1525
    This checks that the fields required are valid output fields.
1526

1527
    """
1528
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1529

    
1530
    _CheckOutputFields(static=["node"],
1531
                       dynamic=["phys", "vg", "name", "size", "instance"],
1532
                       selected=self.op.output_fields)
1533

    
1534

    
1535
  def Exec(self, feedback_fn):
1536
    """Computes the list of nodes and their attributes.
1537

1538
    """
1539
    nodenames = self.nodes
1540
    volumes = rpc.call_node_volumes(nodenames)
1541

    
1542
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1543
             in self.cfg.GetInstanceList()]
1544

    
1545
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1546

    
1547
    output = []
1548
    for node in nodenames:
1549
      if node not in volumes or not volumes[node]:
1550
        continue
1551

    
1552
      node_vols = volumes[node][:]
1553
      node_vols.sort(key=lambda vol: vol['dev'])
1554

    
1555
      for vol in node_vols:
1556
        node_output = []
1557
        for field in self.op.output_fields:
1558
          if field == "node":
1559
            val = node
1560
          elif field == "phys":
1561
            val = vol['dev']
1562
          elif field == "vg":
1563
            val = vol['vg']
1564
          elif field == "name":
1565
            val = vol['name']
1566
          elif field == "size":
1567
            val = int(float(vol['size']))
1568
          elif field == "instance":
1569
            for inst in ilist:
1570
              if node not in lv_by_node[inst]:
1571
                continue
1572
              if vol['name'] in lv_by_node[inst][node]:
1573
                val = inst.name
1574
                break
1575
            else:
1576
              val = '-'
1577
          else:
1578
            raise errors.ParameterError(field)
1579
          node_output.append(str(val))
1580

    
1581
        output.append(node_output)
1582

    
1583
    return output
1584

    
1585

    
1586
class LUAddNode(LogicalUnit):
1587
  """Logical unit for adding node to the cluster.
1588

1589
  """
1590
  HPATH = "node-add"
1591
  HTYPE = constants.HTYPE_NODE
1592
  _OP_REQP = ["node_name"]
1593

    
1594
  def BuildHooksEnv(self):
1595
    """Build hooks env.
1596

1597
    This will run on all nodes before, and on all nodes + the new node after.
1598

1599
    """
1600
    env = {
1601
      "OP_TARGET": self.op.node_name,
1602
      "NODE_NAME": self.op.node_name,
1603
      "NODE_PIP": self.op.primary_ip,
1604
      "NODE_SIP": self.op.secondary_ip,
1605
      }
1606
    nodes_0 = self.cfg.GetNodeList()
1607
    nodes_1 = nodes_0 + [self.op.node_name, ]
1608
    return env, nodes_0, nodes_1
1609

    
1610
  def CheckPrereq(self):
1611
    """Check prerequisites.
1612

1613
    This checks:
1614
     - the new node is not already in the config
1615
     - it is resolvable
1616
     - its parameters (single/dual homed) matches the cluster
1617

1618
    Any errors are signalled by raising errors.OpPrereqError.
1619

1620
    """
1621
    node_name = self.op.node_name
1622
    cfg = self.cfg
1623

    
1624
    dns_data = utils.HostInfo(node_name)
1625

    
1626
    node = dns_data.name
1627
    primary_ip = self.op.primary_ip = dns_data.ip
1628
    secondary_ip = getattr(self.op, "secondary_ip", None)
1629
    if secondary_ip is None:
1630
      secondary_ip = primary_ip
1631
    if not utils.IsValidIP(secondary_ip):
1632
      raise errors.OpPrereqError("Invalid secondary IP given")
1633
    self.op.secondary_ip = secondary_ip
1634

    
1635
    node_list = cfg.GetNodeList()
1636
    if not self.op.readd and node in node_list:
1637
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1638
                                 node)
1639
    elif self.op.readd and node not in node_list:
1640
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1641

    
1642
    for existing_node_name in node_list:
1643
      existing_node = cfg.GetNodeInfo(existing_node_name)
1644

    
1645
      if self.op.readd and node == existing_node_name:
1646
        if (existing_node.primary_ip != primary_ip or
1647
            existing_node.secondary_ip != secondary_ip):
1648
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1649
                                     " address configuration as before")
1650
        continue
1651

    
1652
      if (existing_node.primary_ip == primary_ip or
1653
          existing_node.secondary_ip == primary_ip or
1654
          existing_node.primary_ip == secondary_ip or
1655
          existing_node.secondary_ip == secondary_ip):
1656
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1657
                                   " existing node %s" % existing_node.name)
1658

    
1659
    # check that the type of the node (single versus dual homed) is the
1660
    # same as for the master
1661
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1662
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1663
    newbie_singlehomed = secondary_ip == primary_ip
1664
    if master_singlehomed != newbie_singlehomed:
1665
      if master_singlehomed:
1666
        raise errors.OpPrereqError("The master has no private ip but the"
1667
                                   " new node has one")
1668
      else:
1669
        raise errors.OpPrereqError("The master has a private ip but the"
1670
                                   " new node doesn't have one")
1671

    
1672
    # checks reachablity
1673
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1674
      raise errors.OpPrereqError("Node not reachable by ping")
1675

    
1676
    if not newbie_singlehomed:
1677
      # check reachability from my secondary ip to newbie's secondary ip
1678
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1679
                           source=myself.secondary_ip):
1680
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1681
                                   " based ping to noded port")
1682

    
1683
    self.new_node = objects.Node(name=node,
1684
                                 primary_ip=primary_ip,
1685
                                 secondary_ip=secondary_ip)
1686

    
1687
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1688
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1689
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1690
                                   constants.VNC_PASSWORD_FILE)
1691

    
1692
  def Exec(self, feedback_fn):
1693
    """Adds the new node to the cluster.
1694

1695
    """
1696
    new_node = self.new_node
1697
    node = new_node.name
1698

    
1699
    # set up inter-node password and certificate and restarts the node daemon
1700
    gntpass = self.sstore.GetNodeDaemonPassword()
1701
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1702
      raise errors.OpExecError("ganeti password corruption detected")
1703
    f = open(constants.SSL_CERT_FILE)
1704
    try:
1705
      gntpem = f.read(8192)
1706
    finally:
1707
      f.close()
1708
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1709
    # so we use this to detect an invalid certificate; as long as the
1710
    # cert doesn't contain this, the here-document will be correctly
1711
    # parsed by the shell sequence below
1712
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1713
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1714
    if not gntpem.endswith("\n"):
1715
      raise errors.OpExecError("PEM must end with newline")
1716
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1717

    
1718
    # and then connect with ssh to set password and start ganeti-noded
1719
    # note that all the below variables are sanitized at this point,
1720
    # either by being constants or by the checks above
1721
    ss = self.sstore
1722
    mycommand = ("umask 077 && "
1723
                 "echo '%s' > '%s' && "
1724
                 "cat > '%s' << '!EOF.' && \n"
1725
                 "%s!EOF.\n%s restart" %
1726
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1727
                  constants.SSL_CERT_FILE, gntpem,
1728
                  constants.NODE_INITD_SCRIPT))
1729

    
1730
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1731
    if result.failed:
1732
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1733
                               " output: %s" %
1734
                               (node, result.fail_reason, result.output))
1735

    
1736
    # check connectivity
1737
    time.sleep(4)
1738

    
1739
    result = rpc.call_version([node])[node]
1740
    if result:
1741
      if constants.PROTOCOL_VERSION == result:
1742
        logger.Info("communication to node %s fine, sw version %s match" %
1743
                    (node, result))
1744
      else:
1745
        raise errors.OpExecError("Version mismatch master version %s,"
1746
                                 " node version %s" %
1747
                                 (constants.PROTOCOL_VERSION, result))
1748
    else:
1749
      raise errors.OpExecError("Cannot get version from the new node")
1750

    
1751
    # setup ssh on node
1752
    logger.Info("copy ssh key to node %s" % node)
1753
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1754
    keyarray = []
1755
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1756
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1757
                priv_key, pub_key]
1758

    
1759
    for i in keyfiles:
1760
      f = open(i, 'r')
1761
      try:
1762
        keyarray.append(f.read())
1763
      finally:
1764
        f.close()
1765

    
1766
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1767
                               keyarray[3], keyarray[4], keyarray[5])
1768

    
1769
    if not result:
1770
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1771

    
1772
    # Add node to our /etc/hosts, and add key to known_hosts
1773
    _AddHostToEtcHosts(new_node.name)
1774

    
1775
    if new_node.secondary_ip != new_node.primary_ip:
1776
      if not rpc.call_node_tcp_ping(new_node.name,
1777
                                    constants.LOCALHOST_IP_ADDRESS,
1778
                                    new_node.secondary_ip,
1779
                                    constants.DEFAULT_NODED_PORT,
1780
                                    10, False):
1781
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1782
                                 " you gave (%s). Please fix and re-run this"
1783
                                 " command." % new_node.secondary_ip)
1784

    
1785
    success, msg = self.ssh.VerifyNodeHostname(node)
1786
    if not success:
1787
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1788
                               " than the one the resolver gives: %s."
1789
                               " Please fix and re-run this command." %
1790
                               (node, msg))
1791

    
1792
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1793
    # including the node just added
1794
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1795
    dist_nodes = self.cfg.GetNodeList() + [node]
1796
    if myself.name in dist_nodes:
1797
      dist_nodes.remove(myself.name)
1798

    
1799
    logger.Debug("Copying hosts and known_hosts to all nodes")
1800
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1801
      result = rpc.call_upload_file(dist_nodes, fname)
1802
      for to_node in dist_nodes:
1803
        if not result[to_node]:
1804
          logger.Error("copy of file %s to node %s failed" %
1805
                       (fname, to_node))
1806

    
1807
    to_copy = ss.GetFileList()
1808
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1809
      to_copy.append(constants.VNC_PASSWORD_FILE)
1810
    for fname in to_copy:
1811
      if not self.ssh.CopyFileToNode(node, fname):
1812
        logger.Error("could not copy file %s to node %s" % (fname, node))
1813

    
1814
    if not self.op.readd:
1815
      logger.Info("adding node %s to cluster.conf" % node)
1816
      self.cfg.AddNode(new_node)
1817

    
1818

    
1819
class LUMasterFailover(LogicalUnit):
1820
  """Failover the master node to the current node.
1821

1822
  This is a special LU in that it must run on a non-master node.
1823

1824
  """
1825
  HPATH = "master-failover"
1826
  HTYPE = constants.HTYPE_CLUSTER
1827
  REQ_MASTER = False
1828
  _OP_REQP = []
1829

    
1830
  def BuildHooksEnv(self):
1831
    """Build hooks env.
1832

1833
    This will run on the new master only in the pre phase, and on all
1834
    the nodes in the post phase.
1835

1836
    """
1837
    env = {
1838
      "OP_TARGET": self.new_master,
1839
      "NEW_MASTER": self.new_master,
1840
      "OLD_MASTER": self.old_master,
1841
      }
1842
    return env, [self.new_master], self.cfg.GetNodeList()
1843

    
1844
  def CheckPrereq(self):
1845
    """Check prerequisites.
1846

1847
    This checks that we are not already the master.
1848

1849
    """
1850
    self.new_master = utils.HostInfo().name
1851
    self.old_master = self.sstore.GetMasterNode()
1852

    
1853
    if self.old_master == self.new_master:
1854
      raise errors.OpPrereqError("This commands must be run on the node"
1855
                                 " where you want the new master to be."
1856
                                 " %s is already the master" %
1857
                                 self.old_master)
1858

    
1859
  def Exec(self, feedback_fn):
1860
    """Failover the master node.
1861

1862
    This command, when run on a non-master node, will cause the current
1863
    master to cease being master, and the non-master to become new
1864
    master.
1865

1866
    """
1867
    #TODO: do not rely on gethostname returning the FQDN
1868
    logger.Info("setting master to %s, old master: %s" %
1869
                (self.new_master, self.old_master))
1870

    
1871
    if not rpc.call_node_stop_master(self.old_master):
1872
      logger.Error("could disable the master role on the old master"
1873
                   " %s, please disable manually" % self.old_master)
1874

    
1875
    ss = self.sstore
1876
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1877
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1878
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1879
      logger.Error("could not distribute the new simple store master file"
1880
                   " to the other nodes, please check.")
1881

    
1882
    if not rpc.call_node_start_master(self.new_master):
1883
      logger.Error("could not start the master role on the new master"
1884
                   " %s, please check" % self.new_master)
1885
      feedback_fn("Error in activating the master IP on the new master,"
1886
                  " please fix manually.")
1887

    
1888

    
1889

    
1890
class LUQueryClusterInfo(NoHooksLU):
1891
  """Query cluster configuration.
1892

1893
  """
1894
  _OP_REQP = []
1895
  REQ_MASTER = False
1896

    
1897
  def CheckPrereq(self):
1898
    """No prerequsites needed for this LU.
1899

1900
    """
1901
    pass
1902

    
1903
  def Exec(self, feedback_fn):
1904
    """Return cluster config.
1905

1906
    """
1907
    result = {
1908
      "name": self.sstore.GetClusterName(),
1909
      "software_version": constants.RELEASE_VERSION,
1910
      "protocol_version": constants.PROTOCOL_VERSION,
1911
      "config_version": constants.CONFIG_VERSION,
1912
      "os_api_version": constants.OS_API_VERSION,
1913
      "export_version": constants.EXPORT_VERSION,
1914
      "master": self.sstore.GetMasterNode(),
1915
      "architecture": (platform.architecture()[0], platform.machine()),
1916
      }
1917

    
1918
    return result
1919

    
1920

    
1921
class LUClusterCopyFile(NoHooksLU):
1922
  """Copy file to cluster.
1923

1924
  """
1925
  _OP_REQP = ["nodes", "filename"]
1926

    
1927
  def CheckPrereq(self):
1928
    """Check prerequisites.
1929

1930
    It should check that the named file exists and that the given list
1931
    of nodes is valid.
1932

1933
    """
1934
    if not os.path.exists(self.op.filename):
1935
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1936

    
1937
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1938

    
1939
  def Exec(self, feedback_fn):
1940
    """Copy a file from master to some nodes.
1941

1942
    Args:
1943
      opts - class with options as members
1944
      args - list containing a single element, the file name
1945
    Opts used:
1946
      nodes - list containing the name of target nodes; if empty, all nodes
1947

1948
    """
1949
    filename = self.op.filename
1950

    
1951
    myname = utils.HostInfo().name
1952

    
1953
    for node in self.nodes:
1954
      if node == myname:
1955
        continue
1956
      if not self.ssh.CopyFileToNode(node, filename):
1957
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1958

    
1959

    
1960
class LUDumpClusterConfig(NoHooksLU):
1961
  """Return a text-representation of the cluster-config.
1962

1963
  """
1964
  _OP_REQP = []
1965

    
1966
  def CheckPrereq(self):
1967
    """No prerequisites.
1968

1969
    """
1970
    pass
1971

    
1972
  def Exec(self, feedback_fn):
1973
    """Dump a representation of the cluster config to the standard output.
1974

1975
    """
1976
    return self.cfg.DumpConfig()
1977

    
1978

    
1979
class LURunClusterCommand(NoHooksLU):
1980
  """Run a command on some nodes.
1981

1982
  """
1983
  _OP_REQP = ["command", "nodes"]
1984

    
1985
  def CheckPrereq(self):
1986
    """Check prerequisites.
1987

1988
    It checks that the given list of nodes is valid.
1989

1990
    """
1991
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1992

    
1993
  def Exec(self, feedback_fn):
1994
    """Run a command on some nodes.
1995

1996
    """
1997
    # put the master at the end of the nodes list
1998
    master_node = self.sstore.GetMasterNode()
1999
    if master_node in self.nodes:
2000
      self.nodes.remove(master_node)
2001
      self.nodes.append(master_node)
2002

    
2003
    data = []
2004
    for node in self.nodes:
2005
      result = self.ssh.Run(node, "root", self.op.command)
2006
      data.append((node, result.output, result.exit_code))
2007

    
2008
    return data
2009

    
2010

    
2011
class LUActivateInstanceDisks(NoHooksLU):
2012
  """Bring up an instance's disks.
2013

2014
  """
2015
  _OP_REQP = ["instance_name"]
2016

    
2017
  def CheckPrereq(self):
2018
    """Check prerequisites.
2019

2020
    This checks that the instance is in the cluster.
2021

2022
    """
2023
    instance = self.cfg.GetInstanceInfo(
2024
      self.cfg.ExpandInstanceName(self.op.instance_name))
2025
    if instance is None:
2026
      raise errors.OpPrereqError("Instance '%s' not known" %
2027
                                 self.op.instance_name)
2028
    self.instance = instance
2029

    
2030

    
2031
  def Exec(self, feedback_fn):
2032
    """Activate the disks.
2033

2034
    """
2035
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2036
    if not disks_ok:
2037
      raise errors.OpExecError("Cannot activate block devices")
2038

    
2039
    return disks_info
2040

    
2041

    
2042
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2043
  """Prepare the block devices for an instance.
2044

2045
  This sets up the block devices on all nodes.
2046

2047
  Args:
2048
    instance: a ganeti.objects.Instance object
2049
    ignore_secondaries: if true, errors on secondary nodes won't result
2050
                        in an error return from the function
2051

2052
  Returns:
2053
    false if the operation failed
2054
    list of (host, instance_visible_name, node_visible_name) if the operation
2055
         suceeded with the mapping from node devices to instance devices
2056
  """
2057
  device_info = []
2058
  disks_ok = True
2059
  iname = instance.name
2060
  # With the two passes mechanism we try to reduce the window of
2061
  # opportunity for the race condition of switching DRBD to primary
2062
  # before handshaking occured, but we do not eliminate it
2063

    
2064
  # The proper fix would be to wait (with some limits) until the
2065
  # connection has been made and drbd transitions from WFConnection
2066
  # into any other network-connected state (Connected, SyncTarget,
2067
  # SyncSource, etc.)
2068

    
2069
  # 1st pass, assemble on all nodes in secondary mode
2070
  for inst_disk in instance.disks:
2071
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2072
      cfg.SetDiskID(node_disk, node)
2073
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2074
      if not result:
2075
        logger.Error("could not prepare block device %s on node %s"
2076
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2077
        if not ignore_secondaries:
2078
          disks_ok = False
2079

    
2080
  # FIXME: race condition on drbd migration to primary
2081

    
2082
  # 2nd pass, do only the primary node
2083
  for inst_disk in instance.disks:
2084
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2085
      if node != instance.primary_node:
2086
        continue
2087
      cfg.SetDiskID(node_disk, node)
2088
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2089
      if not result:
2090
        logger.Error("could not prepare block device %s on node %s"
2091
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2092
        disks_ok = False
2093
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2094

    
2095
  # leave the disks configured for the primary node
2096
  # this is a workaround that would be fixed better by
2097
  # improving the logical/physical id handling
2098
  for disk in instance.disks:
2099
    cfg.SetDiskID(disk, instance.primary_node)
2100

    
2101
  return disks_ok, device_info
2102

    
2103

    
2104
def _StartInstanceDisks(cfg, instance, force):
2105
  """Start the disks of an instance.
2106

2107
  """
2108
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2109
                                           ignore_secondaries=force)
2110
  if not disks_ok:
2111
    _ShutdownInstanceDisks(instance, cfg)
2112
    if force is not None and not force:
2113
      logger.Error("If the message above refers to a secondary node,"
2114
                   " you can retry the operation using '--force'.")
2115
    raise errors.OpExecError("Disk consistency error")
2116

    
2117

    
2118
class LUDeactivateInstanceDisks(NoHooksLU):
2119
  """Shutdown an instance's disks.
2120

2121
  """
2122
  _OP_REQP = ["instance_name"]
2123

    
2124
  def CheckPrereq(self):
2125
    """Check prerequisites.
2126

2127
    This checks that the instance is in the cluster.
2128

2129
    """
2130
    instance = self.cfg.GetInstanceInfo(
2131
      self.cfg.ExpandInstanceName(self.op.instance_name))
2132
    if instance is None:
2133
      raise errors.OpPrereqError("Instance '%s' not known" %
2134
                                 self.op.instance_name)
2135
    self.instance = instance
2136

    
2137
  def Exec(self, feedback_fn):
2138
    """Deactivate the disks
2139

2140
    """
2141
    instance = self.instance
2142
    ins_l = rpc.call_instance_list([instance.primary_node])
2143
    ins_l = ins_l[instance.primary_node]
2144
    if not type(ins_l) is list:
2145
      raise errors.OpExecError("Can't contact node '%s'" %
2146
                               instance.primary_node)
2147

    
2148
    if self.instance.name in ins_l:
2149
      raise errors.OpExecError("Instance is running, can't shutdown"
2150
                               " block devices.")
2151

    
2152
    _ShutdownInstanceDisks(instance, self.cfg)
2153

    
2154

    
2155
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2156
  """Shutdown block devices of an instance.
2157

2158
  This does the shutdown on all nodes of the instance.
2159

2160
  If the ignore_primary is false, errors on the primary node are
2161
  ignored.
2162

2163
  """
2164
  result = True
2165
  for disk in instance.disks:
2166
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2167
      cfg.SetDiskID(top_disk, node)
2168
      if not rpc.call_blockdev_shutdown(node, top_disk):
2169
        logger.Error("could not shutdown block device %s on node %s" %
2170
                     (disk.iv_name, node))
2171
        if not ignore_primary or node != instance.primary_node:
2172
          result = False
2173
  return result
2174

    
2175

    
2176
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2177
  """Checks if a node has enough free memory.
2178

2179
  This function check if a given node has the needed amount of free
2180
  memory. In case the node has less memory or we cannot get the
2181
  information from the node, this function raise an OpPrereqError
2182
  exception.
2183

2184
  Args:
2185
    - cfg: a ConfigWriter instance
2186
    - node: the node name
2187
    - reason: string to use in the error message
2188
    - requested: the amount of memory in MiB
2189

2190
  """
2191
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2192
  if not nodeinfo or not isinstance(nodeinfo, dict):
2193
    raise errors.OpPrereqError("Could not contact node %s for resource"
2194
                             " information" % (node,))
2195

    
2196
  free_mem = nodeinfo[node].get('memory_free')
2197
  if not isinstance(free_mem, int):
2198
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2199
                             " was '%s'" % (node, free_mem))
2200
  if requested > free_mem:
2201
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2202
                             " needed %s MiB, available %s MiB" %
2203
                             (node, reason, requested, free_mem))
2204

    
2205

    
2206
class LUStartupInstance(LogicalUnit):
2207
  """Starts an instance.
2208

2209
  """
2210
  HPATH = "instance-start"
2211
  HTYPE = constants.HTYPE_INSTANCE
2212
  _OP_REQP = ["instance_name", "force"]
2213

    
2214
  def BuildHooksEnv(self):
2215
    """Build hooks env.
2216

2217
    This runs on master, primary and secondary nodes of the instance.
2218

2219
    """
2220
    env = {
2221
      "FORCE": self.op.force,
2222
      }
2223
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2224
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2225
          list(self.instance.secondary_nodes))
2226
    return env, nl, nl
2227

    
2228
  def CheckPrereq(self):
2229
    """Check prerequisites.
2230

2231
    This checks that the instance is in the cluster.
2232

2233
    """
2234
    instance = self.cfg.GetInstanceInfo(
2235
      self.cfg.ExpandInstanceName(self.op.instance_name))
2236
    if instance is None:
2237
      raise errors.OpPrereqError("Instance '%s' not known" %
2238
                                 self.op.instance_name)
2239

    
2240
    # check bridges existance
2241
    _CheckInstanceBridgesExist(instance)
2242

    
2243
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2244
                         "starting instance %s" % instance.name,
2245
                         instance.memory)
2246

    
2247
    self.instance = instance
2248
    self.op.instance_name = instance.name
2249

    
2250
  def Exec(self, feedback_fn):
2251
    """Start the instance.
2252

2253
    """
2254
    instance = self.instance
2255
    force = self.op.force
2256
    extra_args = getattr(self.op, "extra_args", "")
2257

    
2258
    self.cfg.MarkInstanceUp(instance.name)
2259

    
2260
    node_current = instance.primary_node
2261

    
2262
    _StartInstanceDisks(self.cfg, instance, force)
2263

    
2264
    if not rpc.call_instance_start(node_current, instance, extra_args):
2265
      _ShutdownInstanceDisks(instance, self.cfg)
2266
      raise errors.OpExecError("Could not start instance")
2267

    
2268

    
2269
class LURebootInstance(LogicalUnit):
2270
  """Reboot an instance.
2271

2272
  """
2273
  HPATH = "instance-reboot"
2274
  HTYPE = constants.HTYPE_INSTANCE
2275
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2276

    
2277
  def BuildHooksEnv(self):
2278
    """Build hooks env.
2279

2280
    This runs on master, primary and secondary nodes of the instance.
2281

2282
    """
2283
    env = {
2284
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2285
      }
2286
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2287
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2288
          list(self.instance.secondary_nodes))
2289
    return env, nl, nl
2290

    
2291
  def CheckPrereq(self):
2292
    """Check prerequisites.
2293

2294
    This checks that the instance is in the cluster.
2295

2296
    """
2297
    instance = self.cfg.GetInstanceInfo(
2298
      self.cfg.ExpandInstanceName(self.op.instance_name))
2299
    if instance is None:
2300
      raise errors.OpPrereqError("Instance '%s' not known" %
2301
                                 self.op.instance_name)
2302

    
2303
    # check bridges existance
2304
    _CheckInstanceBridgesExist(instance)
2305

    
2306
    self.instance = instance
2307
    self.op.instance_name = instance.name
2308

    
2309
  def Exec(self, feedback_fn):
2310
    """Reboot the instance.
2311

2312
    """
2313
    instance = self.instance
2314
    ignore_secondaries = self.op.ignore_secondaries
2315
    reboot_type = self.op.reboot_type
2316
    extra_args = getattr(self.op, "extra_args", "")
2317

    
2318
    node_current = instance.primary_node
2319

    
2320
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2321
                           constants.INSTANCE_REBOOT_HARD,
2322
                           constants.INSTANCE_REBOOT_FULL]:
2323
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2324
                                  (constants.INSTANCE_REBOOT_SOFT,
2325
                                   constants.INSTANCE_REBOOT_HARD,
2326
                                   constants.INSTANCE_REBOOT_FULL))
2327

    
2328
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2329
                       constants.INSTANCE_REBOOT_HARD]:
2330
      if not rpc.call_instance_reboot(node_current, instance,
2331
                                      reboot_type, extra_args):
2332
        raise errors.OpExecError("Could not reboot instance")
2333
    else:
2334
      if not rpc.call_instance_shutdown(node_current, instance):
2335
        raise errors.OpExecError("could not shutdown instance for full reboot")
2336
      _ShutdownInstanceDisks(instance, self.cfg)
2337
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2338
      if not rpc.call_instance_start(node_current, instance, extra_args):
2339
        _ShutdownInstanceDisks(instance, self.cfg)
2340
        raise errors.OpExecError("Could not start instance for full reboot")
2341

    
2342
    self.cfg.MarkInstanceUp(instance.name)
2343

    
2344

    
2345
class LUShutdownInstance(LogicalUnit):
2346
  """Shutdown an instance.
2347

2348
  """
2349
  HPATH = "instance-stop"
2350
  HTYPE = constants.HTYPE_INSTANCE
2351
  _OP_REQP = ["instance_name"]
2352

    
2353
  def BuildHooksEnv(self):
2354
    """Build hooks env.
2355

2356
    This runs on master, primary and secondary nodes of the instance.
2357

2358
    """
2359
    env = _BuildInstanceHookEnvByObject(self.instance)
2360
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2361
          list(self.instance.secondary_nodes))
2362
    return env, nl, nl
2363

    
2364
  def CheckPrereq(self):
2365
    """Check prerequisites.
2366

2367
    This checks that the instance is in the cluster.
2368

2369
    """
2370
    instance = self.cfg.GetInstanceInfo(
2371
      self.cfg.ExpandInstanceName(self.op.instance_name))
2372
    if instance is None:
2373
      raise errors.OpPrereqError("Instance '%s' not known" %
2374
                                 self.op.instance_name)
2375
    self.instance = instance
2376

    
2377
  def Exec(self, feedback_fn):
2378
    """Shutdown the instance.
2379

2380
    """
2381
    instance = self.instance
2382
    node_current = instance.primary_node
2383
    self.cfg.MarkInstanceDown(instance.name)
2384
    if not rpc.call_instance_shutdown(node_current, instance):
2385
      logger.Error("could not shutdown instance")
2386

    
2387
    _ShutdownInstanceDisks(instance, self.cfg)
2388

    
2389

    
2390
class LUReinstallInstance(LogicalUnit):
2391
  """Reinstall an instance.
2392

2393
  """
2394
  HPATH = "instance-reinstall"
2395
  HTYPE = constants.HTYPE_INSTANCE
2396
  _OP_REQP = ["instance_name"]
2397

    
2398
  def BuildHooksEnv(self):
2399
    """Build hooks env.
2400

2401
    This runs on master, primary and secondary nodes of the instance.
2402

2403
    """
2404
    env = _BuildInstanceHookEnvByObject(self.instance)
2405
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2406
          list(self.instance.secondary_nodes))
2407
    return env, nl, nl
2408

    
2409
  def CheckPrereq(self):
2410
    """Check prerequisites.
2411

2412
    This checks that the instance is in the cluster and is not running.
2413

2414
    """
2415
    instance = self.cfg.GetInstanceInfo(
2416
      self.cfg.ExpandInstanceName(self.op.instance_name))
2417
    if instance is None:
2418
      raise errors.OpPrereqError("Instance '%s' not known" %
2419
                                 self.op.instance_name)
2420
    if instance.disk_template == constants.DT_DISKLESS:
2421
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2422
                                 self.op.instance_name)
2423
    if instance.status != "down":
2424
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2425
                                 self.op.instance_name)
2426
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2427
    if remote_info:
2428
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2429
                                 (self.op.instance_name,
2430
                                  instance.primary_node))
2431

    
2432
    self.op.os_type = getattr(self.op, "os_type", None)
2433
    if self.op.os_type is not None:
2434
      # OS verification
2435
      pnode = self.cfg.GetNodeInfo(
2436
        self.cfg.ExpandNodeName(instance.primary_node))
2437
      if pnode is None:
2438
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2439
                                   self.op.pnode)
2440
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2441
      if not os_obj:
2442
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2443
                                   " primary node"  % self.op.os_type)
2444

    
2445
    self.instance = instance
2446

    
2447
  def Exec(self, feedback_fn):
2448
    """Reinstall the instance.
2449

2450
    """
2451
    inst = self.instance
2452

    
2453
    if self.op.os_type is not None:
2454
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2455
      inst.os = self.op.os_type
2456
      self.cfg.AddInstance(inst)
2457

    
2458
    _StartInstanceDisks(self.cfg, inst, None)
2459
    try:
2460
      feedback_fn("Running the instance OS create scripts...")
2461
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2462
        raise errors.OpExecError("Could not install OS for instance %s"
2463
                                 " on node %s" %
2464
                                 (inst.name, inst.primary_node))
2465
    finally:
2466
      _ShutdownInstanceDisks(inst, self.cfg)
2467

    
2468

    
2469
class LURenameInstance(LogicalUnit):
2470
  """Rename an instance.
2471

2472
  """
2473
  HPATH = "instance-rename"
2474
  HTYPE = constants.HTYPE_INSTANCE
2475
  _OP_REQP = ["instance_name", "new_name"]
2476

    
2477
  def BuildHooksEnv(self):
2478
    """Build hooks env.
2479

2480
    This runs on master, primary and secondary nodes of the instance.
2481

2482
    """
2483
    env = _BuildInstanceHookEnvByObject(self.instance)
2484
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2485
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2486
          list(self.instance.secondary_nodes))
2487
    return env, nl, nl
2488

    
2489
  def CheckPrereq(self):
2490
    """Check prerequisites.
2491

2492
    This checks that the instance is in the cluster and is not running.
2493

2494
    """
2495
    instance = self.cfg.GetInstanceInfo(
2496
      self.cfg.ExpandInstanceName(self.op.instance_name))
2497
    if instance is None:
2498
      raise errors.OpPrereqError("Instance '%s' not known" %
2499
                                 self.op.instance_name)
2500
    if instance.status != "down":
2501
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2502
                                 self.op.instance_name)
2503
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2504
    if remote_info:
2505
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2506
                                 (self.op.instance_name,
2507
                                  instance.primary_node))
2508
    self.instance = instance
2509

    
2510
    # new name verification
2511
    name_info = utils.HostInfo(self.op.new_name)
2512

    
2513
    self.op.new_name = new_name = name_info.name
2514
    instance_list = self.cfg.GetInstanceList()
2515
    if new_name in instance_list:
2516
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2517
                                 new_name)
2518

    
2519
    if not getattr(self.op, "ignore_ip", False):
2520
      command = ["fping", "-q", name_info.ip]
2521
      result = utils.RunCmd(command)
2522
      if not result.failed:
2523
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2524
                                   (name_info.ip, new_name))
2525

    
2526

    
2527
  def Exec(self, feedback_fn):
2528
    """Reinstall the instance.
2529

2530
    """
2531
    inst = self.instance
2532
    old_name = inst.name
2533

    
2534
    if inst.disk_template == constants.DT_FILE:
2535
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2536

    
2537
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2538

    
2539
    # re-read the instance from the configuration after rename
2540
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2541

    
2542
    if inst.disk_template == constants.DT_FILE:
2543
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2544
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2545
                                                old_file_storage_dir,
2546
                                                new_file_storage_dir)
2547

    
2548
      if not result:
2549
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2550
                                 " directory '%s' to '%s' (but the instance"
2551
                                 " has been renamed in Ganeti)" % (
2552
                                 inst.primary_node, old_file_storage_dir,
2553
                                 new_file_storage_dir))
2554

    
2555
      if not result[0]:
2556
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2557
                                 " (but the instance has been renamed in"
2558
                                 " Ganeti)" % (old_file_storage_dir,
2559
                                               new_file_storage_dir))
2560

    
2561
    _StartInstanceDisks(self.cfg, inst, None)
2562
    try:
2563
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2564
                                          "sda", "sdb"):
2565
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2566
               " instance has been renamed in Ganeti)" %
2567
               (inst.name, inst.primary_node))
2568
        logger.Error(msg)
2569
    finally:
2570
      _ShutdownInstanceDisks(inst, self.cfg)
2571

    
2572

    
2573
class LURemoveInstance(LogicalUnit):
2574
  """Remove an instance.
2575

2576
  """
2577
  HPATH = "instance-remove"
2578
  HTYPE = constants.HTYPE_INSTANCE
2579
  _OP_REQP = ["instance_name"]
2580

    
2581
  def BuildHooksEnv(self):
2582
    """Build hooks env.
2583

2584
    This runs on master, primary and secondary nodes of the instance.
2585

2586
    """
2587
    env = _BuildInstanceHookEnvByObject(self.instance)
2588
    nl = [self.sstore.GetMasterNode()]
2589
    return env, nl, nl
2590

    
2591
  def CheckPrereq(self):
2592
    """Check prerequisites.
2593

2594
    This checks that the instance is in the cluster.
2595

2596
    """
2597
    instance = self.cfg.GetInstanceInfo(
2598
      self.cfg.ExpandInstanceName(self.op.instance_name))
2599
    if instance is None:
2600
      raise errors.OpPrereqError("Instance '%s' not known" %
2601
                                 self.op.instance_name)
2602
    self.instance = instance
2603

    
2604
  def Exec(self, feedback_fn):
2605
    """Remove the instance.
2606

2607
    """
2608
    instance = self.instance
2609
    logger.Info("shutting down instance %s on node %s" %
2610
                (instance.name, instance.primary_node))
2611

    
2612
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2613
      if self.op.ignore_failures:
2614
        feedback_fn("Warning: can't shutdown instance")
2615
      else:
2616
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2617
                                 (instance.name, instance.primary_node))
2618

    
2619
    logger.Info("removing block devices for instance %s" % instance.name)
2620

    
2621
    if not _RemoveDisks(instance, self.cfg):
2622
      if self.op.ignore_failures:
2623
        feedback_fn("Warning: can't remove instance's disks")
2624
      else:
2625
        raise errors.OpExecError("Can't remove instance's disks")
2626

    
2627
    logger.Info("removing instance %s out of cluster config" % instance.name)
2628

    
2629
    self.cfg.RemoveInstance(instance.name)
2630

    
2631

    
2632
class LUQueryInstances(NoHooksLU):
2633
  """Logical unit for querying instances.
2634

2635
  """
2636
  _OP_REQP = ["output_fields", "names"]
2637

    
2638
  def CheckPrereq(self):
2639
    """Check prerequisites.
2640

2641
    This checks that the fields required are valid output fields.
2642

2643
    """
2644
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2645
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2646
                               "admin_state", "admin_ram",
2647
                               "disk_template", "ip", "mac", "bridge",
2648
                               "sda_size", "sdb_size", "vcpus"],
2649
                       dynamic=self.dynamic_fields,
2650
                       selected=self.op.output_fields)
2651

    
2652
    self.wanted = _GetWantedInstances(self, self.op.names)
2653

    
2654
  def Exec(self, feedback_fn):
2655
    """Computes the list of nodes and their attributes.
2656

2657
    """
2658
    instance_names = self.wanted
2659
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2660
                     in instance_names]
2661

    
2662
    # begin data gathering
2663

    
2664
    nodes = frozenset([inst.primary_node for inst in instance_list])
2665

    
2666
    bad_nodes = []
2667
    if self.dynamic_fields.intersection(self.op.output_fields):
2668
      live_data = {}
2669
      node_data = rpc.call_all_instances_info(nodes)
2670
      for name in nodes:
2671
        result = node_data[name]
2672
        if result:
2673
          live_data.update(result)
2674
        elif result == False:
2675
          bad_nodes.append(name)
2676
        # else no instance is alive
2677
    else:
2678
      live_data = dict([(name, {}) for name in instance_names])
2679

    
2680
    # end data gathering
2681

    
2682
    output = []
2683
    for instance in instance_list:
2684
      iout = []
2685
      for field in self.op.output_fields:
2686
        if field == "name":
2687
          val = instance.name
2688
        elif field == "os":
2689
          val = instance.os
2690
        elif field == "pnode":
2691
          val = instance.primary_node
2692
        elif field == "snodes":
2693
          val = list(instance.secondary_nodes)
2694
        elif field == "admin_state":
2695
          val = (instance.status != "down")
2696
        elif field == "oper_state":
2697
          if instance.primary_node in bad_nodes:
2698
            val = None
2699
          else:
2700
            val = bool(live_data.get(instance.name))
2701
        elif field == "status":
2702
          if instance.primary_node in bad_nodes:
2703
            val = "ERROR_nodedown"
2704
          else:
2705
            running = bool(live_data.get(instance.name))
2706
            if running:
2707
              if instance.status != "down":
2708
                val = "running"
2709
              else:
2710
                val = "ERROR_up"
2711
            else:
2712
              if instance.status != "down":
2713
                val = "ERROR_down"
2714
              else:
2715
                val = "ADMIN_down"
2716
        elif field == "admin_ram":
2717
          val = instance.memory
2718
        elif field == "oper_ram":
2719
          if instance.primary_node in bad_nodes:
2720
            val = None
2721
          elif instance.name in live_data:
2722
            val = live_data[instance.name].get("memory", "?")
2723
          else:
2724
            val = "-"
2725
        elif field == "disk_template":
2726
          val = instance.disk_template
2727
        elif field == "ip":
2728
          val = instance.nics[0].ip
2729
        elif field == "bridge":
2730
          val = instance.nics[0].bridge
2731
        elif field == "mac":
2732
          val = instance.nics[0].mac
2733
        elif field == "sda_size" or field == "sdb_size":
2734
          disk = instance.FindDisk(field[:3])
2735
          if disk is None:
2736
            val = None
2737
          else:
2738
            val = disk.size
2739
        elif field == "vcpus":
2740
          val = instance.vcpus
2741
        else:
2742
          raise errors.ParameterError(field)
2743
        iout.append(val)
2744
      output.append(iout)
2745

    
2746
    return output
2747

    
2748

    
2749
class LUFailoverInstance(LogicalUnit):
2750
  """Failover an instance.
2751

2752
  """
2753
  HPATH = "instance-failover"
2754
  HTYPE = constants.HTYPE_INSTANCE
2755
  _OP_REQP = ["instance_name", "ignore_consistency"]
2756

    
2757
  def BuildHooksEnv(self):
2758
    """Build hooks env.
2759

2760
    This runs on master, primary and secondary nodes of the instance.
2761

2762
    """
2763
    env = {
2764
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2765
      }
2766
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2767
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2768
    return env, nl, nl
2769

    
2770
  def CheckPrereq(self):
2771
    """Check prerequisites.
2772

2773
    This checks that the instance is in the cluster.
2774

2775
    """
2776
    instance = self.cfg.GetInstanceInfo(
2777
      self.cfg.ExpandInstanceName(self.op.instance_name))
2778
    if instance is None:
2779
      raise errors.OpPrereqError("Instance '%s' not known" %
2780
                                 self.op.instance_name)
2781

    
2782
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2783
      raise errors.OpPrereqError("Instance's disk layout is not"
2784
                                 " network mirrored, cannot failover.")
2785

    
2786
    secondary_nodes = instance.secondary_nodes
2787
    if not secondary_nodes:
2788
      raise errors.ProgrammerError("no secondary node but using "
2789
                                   "DT_REMOTE_RAID1 template")
2790

    
2791
    target_node = secondary_nodes[0]
2792
    # check memory requirements on the secondary node
2793
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2794
                         instance.name, instance.memory)
2795

    
2796
    # check bridge existance
2797
    brlist = [nic.bridge for nic in instance.nics]
2798
    if not rpc.call_bridges_exist(target_node, brlist):
2799
      raise errors.OpPrereqError("One or more target bridges %s does not"
2800
                                 " exist on destination node '%s'" %
2801
                                 (brlist, target_node))
2802

    
2803
    self.instance = instance
2804

    
2805
  def Exec(self, feedback_fn):
2806
    """Failover an instance.
2807

2808
    The failover is done by shutting it down on its present node and
2809
    starting it on the secondary.
2810

2811
    """
2812
    instance = self.instance
2813

    
2814
    source_node = instance.primary_node
2815
    target_node = instance.secondary_nodes[0]
2816

    
2817
    feedback_fn("* checking disk consistency between source and target")
2818
    for dev in instance.disks:
2819
      # for remote_raid1, these are md over drbd
2820
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2821
        if instance.status == "up" and not self.op.ignore_consistency:
2822
          raise errors.OpExecError("Disk %s is degraded on target node,"
2823
                                   " aborting failover." % dev.iv_name)
2824

    
2825
    feedback_fn("* shutting down instance on source node")
2826
    logger.Info("Shutting down instance %s on node %s" %
2827
                (instance.name, source_node))
2828

    
2829
    if not rpc.call_instance_shutdown(source_node, instance):
2830
      if self.op.ignore_consistency:
2831
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2832
                     " anyway. Please make sure node %s is down"  %
2833
                     (instance.name, source_node, source_node))
2834
      else:
2835
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2836
                                 (instance.name, source_node))
2837

    
2838
    feedback_fn("* deactivating the instance's disks on source node")
2839
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2840
      raise errors.OpExecError("Can't shut down the instance's disks.")
2841

    
2842
    instance.primary_node = target_node
2843
    # distribute new instance config to the other nodes
2844
    self.cfg.AddInstance(instance)
2845

    
2846
    # Only start the instance if it's marked as up
2847
    if instance.status == "up":
2848
      feedback_fn("* activating the instance's disks on target node")
2849
      logger.Info("Starting instance %s on node %s" %
2850
                  (instance.name, target_node))
2851

    
2852
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2853
                                               ignore_secondaries=True)
2854
      if not disks_ok:
2855
        _ShutdownInstanceDisks(instance, self.cfg)
2856
        raise errors.OpExecError("Can't activate the instance's disks")
2857

    
2858
      feedback_fn("* starting the instance on the target node")
2859
      if not rpc.call_instance_start(target_node, instance, None):
2860
        _ShutdownInstanceDisks(instance, self.cfg)
2861
        raise errors.OpExecError("Could not start instance %s on node %s." %
2862
                                 (instance.name, target_node))
2863

    
2864

    
2865
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2866
  """Create a tree of block devices on the primary node.
2867

2868
  This always creates all devices.
2869

2870
  """
2871
  if device.children:
2872
    for child in device.children:
2873
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2874
        return False
2875

    
2876
  cfg.SetDiskID(device, node)
2877
  new_id = rpc.call_blockdev_create(node, device, device.size,
2878
                                    instance.name, True, info)
2879
  if not new_id:
2880
    return False
2881
  if device.physical_id is None:
2882
    device.physical_id = new_id
2883
  return True
2884

    
2885

    
2886
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2887
  """Create a tree of block devices on a secondary node.
2888

2889
  If this device type has to be created on secondaries, create it and
2890
  all its children.
2891

2892
  If not, just recurse to children keeping the same 'force' value.
2893

2894
  """
2895
  if device.CreateOnSecondary():
2896
    force = True
2897
  if device.children:
2898
    for child in device.children:
2899
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2900
                                        child, force, info):
2901
        return False
2902

    
2903
  if not force:
2904
    return True
2905
  cfg.SetDiskID(device, node)
2906
  new_id = rpc.call_blockdev_create(node, device, device.size,
2907
                                    instance.name, False, info)
2908
  if not new_id:
2909
    return False
2910
  if device.physical_id is None:
2911
    device.physical_id = new_id
2912
  return True
2913

    
2914

    
2915
def _GenerateUniqueNames(cfg, exts):
2916
  """Generate a suitable LV name.
2917

2918
  This will generate a logical volume name for the given instance.
2919

2920
  """
2921
  results = []
2922
  for val in exts:
2923
    new_id = cfg.GenerateUniqueID()
2924
    results.append("%s%s" % (new_id, val))
2925
  return results
2926

    
2927

    
2928
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2929
  """Generate a drbd device complete with its children.
2930

2931
  """
2932
  port = cfg.AllocatePort()
2933
  vgname = cfg.GetVGName()
2934
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2935
                          logical_id=(vgname, names[0]))
2936
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2937
                          logical_id=(vgname, names[1]))
2938
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2939
                          logical_id = (primary, secondary, port),
2940
                          children = [dev_data, dev_meta])
2941
  return drbd_dev
2942

    
2943

    
2944
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2945
  """Generate a drbd8 device complete with its children.
2946

2947
  """
2948
  port = cfg.AllocatePort()
2949
  vgname = cfg.GetVGName()
2950
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2951
                          logical_id=(vgname, names[0]))
2952
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2953
                          logical_id=(vgname, names[1]))
2954
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2955
                          logical_id = (primary, secondary, port),
2956
                          children = [dev_data, dev_meta],
2957
                          iv_name=iv_name)
2958
  return drbd_dev
2959

    
2960

    
2961
def _GenerateDiskTemplate(cfg, template_name,
2962
                          instance_name, primary_node,
2963
                          secondary_nodes, disk_sz, swap_sz,
2964
                          file_storage_dir, file_driver):
2965
  """Generate the entire disk layout for a given template type.
2966

2967
  """
2968
  #TODO: compute space requirements
2969

    
2970
  vgname = cfg.GetVGName()
2971
  if template_name == constants.DT_DISKLESS:
2972
    disks = []
2973
  elif template_name == constants.DT_PLAIN:
2974
    if len(secondary_nodes) != 0:
2975
      raise errors.ProgrammerError("Wrong template configuration")
2976

    
2977
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2978
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2979
                           logical_id=(vgname, names[0]),
2980
                           iv_name = "sda")
2981
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2982
                           logical_id=(vgname, names[1]),
2983
                           iv_name = "sdb")
2984
    disks = [sda_dev, sdb_dev]
2985
  elif template_name == constants.DT_DRBD8:
2986
    if len(secondary_nodes) != 1:
2987
      raise errors.ProgrammerError("Wrong template configuration")
2988
    remote_node = secondary_nodes[0]
2989
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2990
                                       ".sdb_data", ".sdb_meta"])
2991
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2992
                                         disk_sz, names[0:2], "sda")
2993
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2994
                                         swap_sz, names[2:4], "sdb")
2995
    disks = [drbd_sda_dev, drbd_sdb_dev]
2996
  elif template_name == constants.DT_FILE:
2997
    if len(secondary_nodes) != 0:
2998
      raise errors.ProgrammerError("Wrong template configuration")
2999

    
3000
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3001
                                iv_name="sda", logical_id=(file_driver,
3002
                                "%s/sda" % file_storage_dir))
3003
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3004
                                iv_name="sdb", logical_id=(file_driver,
3005
                                "%s/sdb" % file_storage_dir))
3006
    disks = [file_sda_dev, file_sdb_dev]
3007
  else:
3008
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3009
  return disks
3010

    
3011

    
3012
def _GetInstanceInfoText(instance):
3013
  """Compute that text that should be added to the disk's metadata.
3014

3015
  """
3016
  return "originstname+%s" % instance.name
3017

    
3018

    
3019
def _CreateDisks(cfg, instance):
3020
  """Create all disks for an instance.
3021

3022
  This abstracts away some work from AddInstance.
3023

3024
  Args:
3025
    instance: the instance object
3026

3027
  Returns:
3028
    True or False showing the success of the creation process
3029

3030
  """
3031
  info = _GetInstanceInfoText(instance)
3032

    
3033
  if instance.disk_template == constants.DT_FILE:
3034
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3035
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3036
                                              file_storage_dir)
3037

    
3038
    if not result:
3039
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3040
      return False
3041

    
3042
    if not result[0]:
3043
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3044
      return False
3045

    
3046
  for device in instance.disks:
3047
    logger.Info("creating volume %s for instance %s" %
3048
                (device.iv_name, instance.name))
3049
    #HARDCODE
3050
    for secondary_node in instance.secondary_nodes:
3051
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3052
                                        device, False, info):
3053
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3054
                     (device.iv_name, device, secondary_node))
3055
        return False
3056
    #HARDCODE
3057
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3058
                                    instance, device, info):
3059
      logger.Error("failed to create volume %s on primary!" %
3060
                   device.iv_name)
3061
      return False
3062

    
3063
  return True
3064

    
3065

    
3066
def _RemoveDisks(instance, cfg):
3067
  """Remove all disks for an instance.
3068

3069
  This abstracts away some work from `AddInstance()` and
3070
  `RemoveInstance()`. Note that in case some of the devices couldn't
3071
  be removed, the removal will continue with the other ones (compare
3072
  with `_CreateDisks()`).
3073

3074
  Args:
3075
    instance: the instance object
3076

3077
  Returns:
3078
    True or False showing the success of the removal proces
3079

3080
  """
3081
  logger.Info("removing block devices for instance %s" % instance.name)
3082

    
3083
  result = True
3084
  for device in instance.disks:
3085
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3086
      cfg.SetDiskID(disk, node)
3087
      if not rpc.call_blockdev_remove(node, disk):
3088
        logger.Error("could not remove block device %s on node %s,"
3089
                     " continuing anyway" %
3090
                     (device.iv_name, node))
3091
        result = False
3092

    
3093
  if instance.disk_template == constants.DT_FILE:
3094
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3095
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3096
                                            file_storage_dir):
3097
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3098
      result = False
3099

    
3100
  return result
3101

    
3102

    
3103
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3104
  """Compute disk size requirements in the volume group
3105

3106
  This is currently hard-coded for the two-drive layout.
3107

3108
  """
3109
  # Required free disk space as a function of disk and swap space
3110
  req_size_dict = {
3111
    constants.DT_DISKLESS: None,
3112
    constants.DT_PLAIN: disk_size + swap_size,
3113
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3114
    constants.DT_DRBD8: disk_size + swap_size + 256,
3115
    constants.DT_FILE: None,
3116
  }
3117

    
3118
  if disk_template not in req_size_dict:
3119
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3120
                                 " is unknown" %  disk_template)
3121

    
3122
  return req_size_dict[disk_template]
3123

    
3124

    
3125
class LUCreateInstance(LogicalUnit):
3126
  """Create an instance.
3127

3128
  """
3129
  HPATH = "instance-add"
3130
  HTYPE = constants.HTYPE_INSTANCE
3131
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3132
              "disk_template", "swap_size", "mode", "start", "vcpus",
3133
              "wait_for_sync", "ip_check", "mac"]
3134

    
3135
  def _RunAllocator(self):
3136
    """Run the allocator based on input opcode.
3137

3138
    """
3139
    disks = [{"size": self.op.disk_size, "mode": "w"},
3140
             {"size": self.op.swap_size, "mode": "w"}]
3141
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3142
             "bridge": self.op.bridge}]
3143
    ial = IAllocator(self.cfg, self.sstore,
3144
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3145
                     name=self.op.instance_name,
3146
                     disk_template=self.op.disk_template,
3147
                     tags=[],
3148
                     os=self.op.os_type,
3149
                     vcpus=self.op.vcpus,
3150
                     mem_size=self.op.mem_size,
3151
                     disks=disks,
3152
                     nics=nics,
3153
                     )
3154

    
3155
    ial.Run(self.op.iallocator)
3156

    
3157
    if not ial.success:
3158
      raise errors.OpPrereqError("Can't compute nodes using"
3159
                                 " iallocator '%s': %s" % (self.op.iallocator,
3160
                                                           ial.info))
3161
    if len(ial.nodes) != ial.required_nodes:
3162
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3163
                                 " of nodes (%s), required %s" %
3164
                                 (len(ial.nodes), ial.required_nodes))
3165
    self.op.pnode = ial.nodes[0]
3166
    logger.ToStdout("Selected nodes for the instance: %s" %
3167
                    (", ".join(ial.nodes),))
3168
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3169
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3170
    if ial.required_nodes == 2:
3171
      self.op.snode = ial.nodes[1]
3172

    
3173
  def BuildHooksEnv(self):
3174
    """Build hooks env.
3175

3176
    This runs on master, primary and secondary nodes of the instance.
3177

3178
    """
3179
    env = {
3180
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3181
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3182
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3183
      "INSTANCE_ADD_MODE": self.op.mode,
3184
      }
3185
    if self.op.mode == constants.INSTANCE_IMPORT:
3186
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3187
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3188
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3189

    
3190
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3191
      primary_node=self.op.pnode,
3192
      secondary_nodes=self.secondaries,
3193
      status=self.instance_status,
3194
      os_type=self.op.os_type,
3195
      memory=self.op.mem_size,
3196
      vcpus=self.op.vcpus,
3197
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3198
    ))
3199

    
3200
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3201
          self.secondaries)
3202
    return env, nl, nl
3203

    
3204

    
3205
  def CheckPrereq(self):
3206
    """Check prerequisites.
3207

3208
    """
3209
    # set optional parameters to none if they don't exist
3210
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3211
                 "iallocator"]:
3212
      if not hasattr(self.op, attr):
3213
        setattr(self.op, attr, None)
3214

    
3215
    if self.op.mode not in (constants.INSTANCE_CREATE,
3216
                            constants.INSTANCE_IMPORT):
3217
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3218
                                 self.op.mode)
3219

    
3220
    if (not self.cfg.GetVGName() and
3221
        self.op.disk_template not in constants.DTS_NOT_LVM):
3222
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3223
                                 " instances")
3224

    
3225
    if self.op.mode == constants.INSTANCE_IMPORT:
3226
      src_node = getattr(self.op, "src_node", None)
3227
      src_path = getattr(self.op, "src_path", None)
3228
      if src_node is None or src_path is None:
3229
        raise errors.OpPrereqError("Importing an instance requires source"
3230
                                   " node and path options")
3231
      src_node_full = self.cfg.ExpandNodeName(src_node)
3232
      if src_node_full is None:
3233
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3234
      self.op.src_node = src_node = src_node_full
3235

    
3236
      if not os.path.isabs(src_path):
3237
        raise errors.OpPrereqError("The source path must be absolute")
3238

    
3239
      export_info = rpc.call_export_info(src_node, src_path)
3240

    
3241
      if not export_info:
3242
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3243

    
3244
      if not export_info.has_section(constants.INISECT_EXP):
3245
        raise errors.ProgrammerError("Corrupted export config")
3246

    
3247
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3248
      if (int(ei_version) != constants.EXPORT_VERSION):
3249
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3250
                                   (ei_version, constants.EXPORT_VERSION))
3251

    
3252
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3253
        raise errors.OpPrereqError("Can't import instance with more than"
3254
                                   " one data disk")
3255

    
3256
      # FIXME: are the old os-es, disk sizes, etc. useful?
3257
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3258
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3259
                                                         'disk0_dump'))
3260
      self.src_image = diskimage
3261
    else: # INSTANCE_CREATE
3262
      if getattr(self.op, "os_type", None) is None:
3263
        raise errors.OpPrereqError("No guest OS specified")
3264

    
3265
    #### instance parameters check
3266

    
3267
    # disk template and mirror node verification
3268
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3269
      raise errors.OpPrereqError("Invalid disk template name")
3270

    
3271
    # instance name verification
3272
    hostname1 = utils.HostInfo(self.op.instance_name)
3273

    
3274
    self.op.instance_name = instance_name = hostname1.name
3275
    instance_list = self.cfg.GetInstanceList()
3276
    if instance_name in instance_list:
3277
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3278
                                 instance_name)
3279

    
3280
    # ip validity checks
3281
    ip = getattr(self.op, "ip", None)
3282
    if ip is None or ip.lower() == "none":
3283
      inst_ip = None
3284
    elif ip.lower() == "auto":
3285
      inst_ip = hostname1.ip
3286
    else:
3287
      if not utils.IsValidIP(ip):
3288
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3289
                                   " like a valid IP" % ip)
3290
      inst_ip = ip
3291
    self.inst_ip = self.op.ip = inst_ip
3292

    
3293
    if self.op.start and not self.op.ip_check:
3294
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3295
                                 " adding an instance in start mode")
3296

    
3297
    if self.op.ip_check:
3298
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3299
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3300
                                   (hostname1.ip, instance_name))
3301

    
3302
    # MAC address verification
3303
    if self.op.mac != "auto":
3304
      if not utils.IsValidMac(self.op.mac.lower()):
3305
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3306
                                   self.op.mac)
3307

    
3308
    # bridge verification
3309
    bridge = getattr(self.op, "bridge", None)
3310
    if bridge is None:
3311
      self.op.bridge = self.cfg.GetDefBridge()
3312
    else:
3313
      self.op.bridge = bridge
3314

    
3315
    # boot order verification
3316
    if self.op.hvm_boot_order is not None:
3317
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3318
        raise errors.OpPrereqError("invalid boot order specified,"
3319
                                   " must be one or more of [acdn]")
3320
    # file storage checks
3321
    if (self.op.file_driver and
3322
        not self.op.file_driver in constants.FILE_DRIVER):
3323
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3324
                                 self.op.file_driver)
3325

    
3326
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3327
      raise errors.OpPrereqError("File storage directory not a relative"
3328
                                 " path")
3329
    #### allocator run
3330

    
3331
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3332
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3333
                                 " node must be given")
3334

    
3335
    if self.op.iallocator is not None:
3336
      self._RunAllocator()
3337

    
3338
    #### node related checks
3339

    
3340
    # check primary node
3341
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3342
    if pnode is None:
3343
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3344
                                 self.op.pnode)
3345
    self.op.pnode = pnode.name
3346
    self.pnode = pnode
3347
    self.secondaries = []
3348

    
3349
    # mirror node verification
3350
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3351
      if getattr(self.op, "snode", None) is None:
3352
        raise errors.OpPrereqError("The networked disk templates need"
3353
                                   " a mirror node")
3354

    
3355
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3356
      if snode_name is None:
3357
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3358
                                   self.op.snode)
3359
      elif snode_name == pnode.name:
3360
        raise errors.OpPrereqError("The secondary node cannot be"
3361
                                   " the primary node.")
3362
      self.secondaries.append(snode_name)
3363

    
3364
    req_size = _ComputeDiskSize(self.op.disk_template,
3365
                                self.op.disk_size, self.op.swap_size)
3366

    
3367
    # Check lv size requirements
3368
    if req_size is not None:
3369
      nodenames = [pnode.name] + self.secondaries
3370
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3371
      for node in nodenames:
3372
        info = nodeinfo.get(node, None)
3373
        if not info:
3374
          raise errors.OpPrereqError("Cannot get current information"
3375
                                     " from node '%s'" % nodeinfo)
3376
        vg_free = info.get('vg_free', None)
3377
        if not isinstance(vg_free, int):
3378
          raise errors.OpPrereqError("Can't compute free disk space on"
3379
                                     " node %s" % node)
3380
        if req_size > info['vg_free']:
3381
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3382
                                     " %d MB available, %d MB required" %
3383
                                     (node, info['vg_free'], req_size))
3384

    
3385
    # os verification
3386
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3387
    if not os_obj:
3388
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3389
                                 " primary node"  % self.op.os_type)
3390

    
3391
    if self.op.kernel_path == constants.VALUE_NONE:
3392
      raise errors.OpPrereqError("Can't set instance kernel to none")
3393

    
3394

    
3395
    # bridge check on primary node
3396
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3397
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3398
                                 " destination node '%s'" %
3399
                                 (self.op.bridge, pnode.name))
3400

    
3401
    if self.op.start:
3402
      self.instance_status = 'up'
3403
    else:
3404
      self.instance_status = 'down'
3405

    
3406
  def Exec(self, feedback_fn):
3407
    """Create and add the instance to the cluster.
3408

3409
    """
3410
    instance = self.op.instance_name
3411
    pnode_name = self.pnode.name
3412

    
3413
    if self.op.mac == "auto":
3414
      mac_address = self.cfg.GenerateMAC()
3415
    else:
3416
      mac_address = self.op.mac
3417

    
3418
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3419
    if self.inst_ip is not None:
3420
      nic.ip = self.inst_ip
3421

    
3422
    ht_kind = self.sstore.GetHypervisorType()
3423
    if ht_kind in constants.HTS_REQ_PORT:
3424
      network_port = self.cfg.AllocatePort()
3425
    else:
3426
      network_port = None
3427

    
3428
    # this is needed because os.path.join does not accept None arguments
3429
    if self.op.file_storage_dir is None:
3430
      string_file_storage_dir = ""
3431
    else:
3432
      string_file_storage_dir = self.op.file_storage_dir
3433

    
3434
    # build the full file storage dir path
3435
    file_storage_dir = os.path.normpath(os.path.join(
3436
                                        self.sstore.GetFileStorageDir(),
3437
                                        string_file_storage_dir, instance))
3438

    
3439

    
3440
    disks = _GenerateDiskTemplate(self.cfg,
3441
                                  self.op.disk_template,
3442
                                  instance, pnode_name,
3443
                                  self.secondaries, self.op.disk_size,
3444
                                  self.op.swap_size,
3445
                                  file_storage_dir,
3446
                                  self.op.file_driver)
3447

    
3448
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3449
                            primary_node=pnode_name,
3450
                            memory=self.op.mem_size,
3451
                            vcpus=self.op.vcpus,
3452
                            nics=[nic], disks=disks,
3453
                            disk_template=self.op.disk_template,
3454
                            status=self.instance_status,
3455
                            network_port=network_port,
3456
                            kernel_path=self.op.kernel_path,
3457
                            initrd_path=self.op.initrd_path,
3458
                            hvm_boot_order=self.op.hvm_boot_order,
3459
                            )
3460

    
3461
    feedback_fn("* creating instance disks...")
3462
    if not _CreateDisks(self.cfg, iobj):
3463
      _RemoveDisks(iobj, self.cfg)
3464
      raise errors.OpExecError("Device creation failed, reverting...")
3465

    
3466
    feedback_fn("adding instance %s to cluster config" % instance)
3467

    
3468
    self.cfg.AddInstance(iobj)
3469

    
3470
    if self.op.wait_for_sync:
3471
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3472
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3473
      # make sure the disks are not degraded (still sync-ing is ok)
3474
      time.sleep(15)
3475
      feedback_fn("* checking mirrors status")
3476
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3477
    else:
3478
      disk_abort = False
3479

    
3480
    if disk_abort:
3481
      _RemoveDisks(iobj, self.cfg)
3482
      self.cfg.RemoveInstance(iobj.name)
3483
      raise errors.OpExecError("There are some degraded disks for"
3484
                               " this instance")
3485

    
3486
    feedback_fn("creating os for instance %s on node %s" %
3487
                (instance, pnode_name))
3488

    
3489
    if iobj.disk_template != constants.DT_DISKLESS:
3490
      if self.op.mode == constants.INSTANCE_CREATE:
3491
        feedback_fn("* running the instance OS create scripts...")
3492
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3493
          raise errors.OpExecError("could not add os for instance %s"
3494
                                   " on node %s" %
3495
                                   (instance, pnode_name))
3496

    
3497
      elif self.op.mode == constants.INSTANCE_IMPORT:
3498
        feedback_fn("* running the instance OS import scripts...")
3499
        src_node = self.op.src_node
3500
        src_image = self.src_image
3501
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3502
                                                src_node, src_image):
3503
          raise errors.OpExecError("Could not import os for instance"
3504
                                   " %s on node %s" %
3505
                                   (instance, pnode_name))
3506
      else:
3507
        # also checked in the prereq part
3508
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3509
                                     % self.op.mode)
3510

    
3511
    if self.op.start:
3512
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3513
      feedback_fn("* starting instance...")
3514
      if not rpc.call_instance_start(pnode_name, iobj, None):
3515
        raise errors.OpExecError("Could not start instance")
3516

    
3517

    
3518
class LUConnectConsole(NoHooksLU):
3519
  """Connect to an instance's console.
3520

3521
  This is somewhat special in that it returns the command line that
3522
  you need to run on the master node in order to connect to the
3523
  console.
3524

3525
  """
3526
  _OP_REQP = ["instance_name"]
3527

    
3528
  def CheckPrereq(self):
3529
    """Check prerequisites.
3530

3531
    This checks that the instance is in the cluster.
3532

3533
    """
3534
    instance = self.cfg.GetInstanceInfo(
3535
      self.cfg.ExpandInstanceName(self.op.instance_name))
3536
    if instance is None:
3537
      raise errors.OpPrereqError("Instance '%s' not known" %
3538
                                 self.op.instance_name)
3539
    self.instance = instance
3540

    
3541
  def Exec(self, feedback_fn):
3542
    """Connect to the console of an instance
3543

3544
    """
3545
    instance = self.instance
3546
    node = instance.primary_node
3547

    
3548
    node_insts = rpc.call_instance_list([node])[node]
3549
    if node_insts is False:
3550
      raise errors.OpExecError("Can't connect to node %s." % node)
3551

    
3552
    if instance.name not in node_insts:
3553
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3554

    
3555
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3556

    
3557
    hyper = hypervisor.GetHypervisor()
3558
    console_cmd = hyper.GetShellCommandForConsole(instance)
3559

    
3560
    # build ssh cmdline
3561
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3562

    
3563

    
3564
class LUReplaceDisks(LogicalUnit):
3565
  """Replace the disks of an instance.
3566

3567
  """
3568
  HPATH = "mirrors-replace"
3569
  HTYPE = constants.HTYPE_INSTANCE
3570
  _OP_REQP = ["instance_name", "mode", "disks"]
3571

    
3572
  def _RunAllocator(self):
3573
    """Compute a new secondary node using an IAllocator.
3574

3575
    """
3576
    ial = IAllocator(self.cfg, self.sstore,
3577
                     mode=constants.IALLOCATOR_MODE_RELOC,
3578
                     name=self.op.instance_name,
3579
                     relocate_from=[self.sec_node])
3580

    
3581
    ial.Run(self.op.iallocator)
3582

    
3583
    if not ial.success:
3584
      raise errors.OpPrereqError("Can't compute nodes using"
3585
                                 " iallocator '%s': %s" % (self.op.iallocator,
3586
                                                           ial.info))
3587
    if len(ial.nodes) != ial.required_nodes:
3588
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3589
                                 " of nodes (%s), required %s" %
3590
                                 (len(ial.nodes), ial.required_nodes))
3591
    self.op.remote_node = ial.nodes[0]
3592
    logger.ToStdout("Selected new secondary for the instance: %s" %
3593
                    self.op.remote_node)
3594

    
3595
  def BuildHooksEnv(self):
3596
    """Build hooks env.
3597

3598
    This runs on the master, the primary and all the secondaries.
3599

3600
    """
3601
    env = {
3602
      "MODE": self.op.mode,
3603
      "NEW_SECONDARY": self.op.remote_node,
3604
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3605
      }
3606
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3607
    nl = [
3608
      self.sstore.GetMasterNode(),
3609
      self.instance.primary_node,
3610
      ]
3611
    if self.op.remote_node is not None:
3612
      nl.append(self.op.remote_node)
3613
    return env, nl, nl
3614

    
3615
  def CheckPrereq(self):
3616
    """Check prerequisites.
3617

3618
    This checks that the instance is in the cluster.
3619

3620
    """
3621
    if not hasattr(self.op, "remote_node"):
3622
      self.op.remote_node = None
3623

    
3624
    instance = self.cfg.GetInstanceInfo(
3625
      self.cfg.ExpandInstanceName(self.op.instance_name))
3626
    if instance is None:
3627
      raise errors.OpPrereqError("Instance '%s' not known" %
3628
                                 self.op.instance_name)
3629
    self.instance = instance
3630
    self.op.instance_name = instance.name
3631

    
3632
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3633
      raise errors.OpPrereqError("Instance's disk layout is not"
3634
                                 " network mirrored.")
3635

    
3636
    if len(instance.secondary_nodes) != 1:
3637
      raise errors.OpPrereqError("The instance has a strange layout,"
3638
                                 " expected one secondary but found %d" %
3639
                                 len(instance.secondary_nodes))
3640

    
3641
    self.sec_node = instance.secondary_nodes[0]
3642

    
3643
    ia_name = getattr(self.op, "iallocator", None)
3644
    if ia_name is not None:
3645
      if self.op.remote_node is not None:
3646
        raise errors.OpPrereqError("Give either the iallocator or the new"
3647
                                   " secondary, not both")
3648
      self.op.remote_node = self._RunAllocator()
3649

    
3650
    remote_node = self.op.remote_node
3651
    if remote_node is not None:
3652
      remote_node = self.cfg.ExpandNodeName(remote_node)
3653
      if remote_node is None:
3654
        raise errors.OpPrereqError("Node '%s' not known" %
3655
                                   self.op.remote_node)
3656
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3657
    else:
3658
      self.remote_node_info = None
3659
    if remote_node == instance.primary_node:
3660
      raise errors.OpPrereqError("The specified node is the primary node of"
3661
                                 " the instance.")
3662
    elif remote_node == self.sec_node:
3663
      if self.op.mode == constants.REPLACE_DISK_SEC:
3664
        # this is for DRBD8, where we can't execute the same mode of
3665
        # replacement as for drbd7 (no different port allocated)
3666
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3667
                                   " replacement")
3668
      # the user gave the current secondary, switch to
3669
      # 'no-replace-secondary' mode for drbd7
3670
      remote_node = None
3671
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3672
        self.op.mode != constants.REPLACE_DISK_ALL):
3673
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3674
                                 " disks replacement, not individual ones")
3675
    if instance.disk_template == constants.DT_DRBD8:
3676
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3677
          remote_node is not None):
3678
        # switch to replace secondary mode
3679
        self.op.mode = constants.REPLACE_DISK_SEC
3680

    
3681
      if self.op.mode == constants.REPLACE_DISK_ALL:
3682
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3683
                                   " secondary disk replacement, not"
3684
                                   " both at once")
3685
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3686
        if remote_node is not None:
3687
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3688
                                     " the secondary while doing a primary"
3689
                                     " node disk replacement")
3690
        self.tgt_node = instance.primary_node
3691
        self.oth_node = instance.secondary_nodes[0]
3692
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3693
        self.new_node = remote_node # this can be None, in which case
3694
                                    # we don't change the secondary
3695
        self.tgt_node = instance.secondary_nodes[0]
3696
        self.oth_node = instance.primary_node
3697
      else:
3698
        raise errors.ProgrammerError("Unhandled disk replace mode")
3699

    
3700
    for name in self.op.disks:
3701
      if instance.FindDisk(name) is None:
3702
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3703
                                   (name, instance.name))
3704
    self.op.remote_node = remote_node
3705

    
3706
  def _ExecRR1(self, feedback_fn):
3707
    """Replace the disks of an instance.
3708

3709
    """
3710
    instance = self.instance
3711
    iv_names = {}
3712
    # start of work
3713
    if self.op.remote_node is None:
3714
      remote_node = self.sec_node
3715
    else:
3716
      remote_node = self.op.remote_node
3717
    cfg = self.cfg
3718
    for dev in instance.disks:
3719
      size = dev.size
3720
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3721
      names = _GenerateUniqueNames(cfg, lv_names)
3722
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3723
                                       remote_node, size, names)
3724
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3725
      logger.Info("adding new mirror component on secondary for %s" %
3726
                  dev.iv_name)
3727
      #HARDCODE
3728
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3729
                                        new_drbd, False,
3730
                                        _GetInstanceInfoText(instance)):
3731
        raise errors.OpExecError("Failed to create new component on secondary"
3732
                                 " node %s. Full abort, cleanup manually!" %
3733
                                 remote_node)
3734

    
3735
      logger.Info("adding new mirror component on primary")
3736
      #HARDCODE
3737
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3738
                                      instance, new_drbd,
3739
                                      _GetInstanceInfoText(instance)):
3740
        # remove secondary dev
3741
        cfg.SetDiskID(new_drbd, remote_node)
3742
        rpc.call_blockdev_remove(remote_node, new_drbd)
3743
        raise errors.OpExecError("Failed to create volume on primary!"
3744
                                 " Full abort, cleanup manually!!")
3745

    
3746
      # the device exists now
3747
      # call the primary node to add the mirror to md
3748
      logger.Info("adding new mirror component to md")
3749
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3750
                                           [new_drbd]):
3751
        logger.Error("Can't add mirror compoment to md!")
3752
        cfg.SetDiskID(new_drbd, remote_node)
3753
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3754
          logger.Error("Can't rollback on secondary")
3755
        cfg.SetDiskID(new_drbd, instance.primary_node)
3756
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3757
          logger.Error("Can't rollback on primary")
3758
        raise errors.OpExecError("Full abort, cleanup manually!!")
3759

    
3760
      dev.children.append(new_drbd)
3761
      cfg.AddInstance(instance)
3762

    
3763
    # this can fail as the old devices are degraded and _WaitForSync
3764
    # does a combined result over all disks, so we don't check its
3765
    # return value
3766
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3767

    
3768
    # so check manually all the devices
3769
    for name in iv_names:
3770
      dev, child, new_drbd = iv_names[name]
3771
      cfg.SetDiskID(dev, instance.primary_node)
3772
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3773
      if is_degr:
3774
        raise errors.OpExecError("MD device %s is degraded!" % name)
3775
      cfg.SetDiskID(new_drbd, instance.primary_node)
3776
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3777
      if is_degr:
3778
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3779

    
3780
    for name in iv_names:
3781
      dev, child, new_drbd = iv_names[name]
3782
      logger.Info("remove mirror %s component" % name)
3783
      cfg.SetDiskID(dev, instance.primary_node)
3784
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3785
                                              dev, [child]):
3786
        logger.Error("Can't remove child from mirror, aborting"
3787
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3788
        continue
3789

    
3790
      for node in child.logical_id[:2]:
3791
        logger.Info("remove child device on %s" % node)
3792
        cfg.SetDiskID(child, node)
3793
        if not rpc.call_blockdev_remove(node, child):
3794
          logger.Error("Warning: failed to remove device from node %s,"
3795
                       " continuing operation." % node)
3796

    
3797
      dev.children.remove(child)
3798

    
3799
      cfg.AddInstance(instance)
3800

    
3801
  def _ExecD8DiskOnly(self, feedback_fn):
3802
    """Replace a disk on the primary or secondary for dbrd8.
3803

3804
    The algorithm for replace is quite complicated:
3805
      - for each disk to be replaced:
3806
        - create new LVs on the target node with unique names
3807
        - detach old LVs from the drbd device
3808
        - rename old LVs to name_replaced.<time_t>
3809
        - rename new LVs to old LVs
3810
        - attach the new LVs (with the old names now) to the drbd device
3811
      - wait for sync across all devices
3812
      - for each modified disk:
3813
        - remove old LVs (which have the name name_replaces.<time_t>)
3814

3815
    Failures are not very well handled.
3816

3817
    """
3818
    steps_total = 6
3819
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3820
    instance = self.instance
3821
    iv_names = {}
3822
    vgname = self.cfg.GetVGName()
3823
    # start of work
3824
    cfg = self.cfg
3825
    tgt_node = self.tgt_node
3826
    oth_node = self.oth_node
3827

    
3828
    # Step: check device activation
3829
    self.proc.LogStep(1, steps_total, "check device existence")
3830
    info("checking volume groups")
3831
    my_vg = cfg.GetVGName()
3832
    results = rpc.call_vg_list([oth_node, tgt_node])
3833
    if not results:
3834
      raise errors.OpExecError("Can't list volume groups on the nodes")
3835
    for node in oth_node, tgt_node:
3836
      res = results.get(node, False)
3837
      if not res or my_vg not in res:
3838
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3839
                                 (my_vg, node))
3840
    for dev in instance.disks:
3841
      if not dev.iv_name in self.op.disks:
3842
        continue
3843
      for node in tgt_node, oth_node:
3844
        info("checking %s on %s" % (dev.iv_name, node))
3845
        cfg.SetDiskID(dev, node)
3846
        if not rpc.call_blockdev_find(node, dev):
3847
          raise errors.OpExecError("Can't find device %s on node %s" %
3848
                                   (dev.iv_name, node))
3849

    
3850
    # Step: check other node consistency
3851
    self.proc.LogStep(2, steps_total, "check peer consistency")
3852
    for dev in instance.disks:
3853
      if not dev.iv_name in self.op.disks:
3854
        continue
3855
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3856
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3857
                                   oth_node==instance.primary_node):
3858
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3859
                                 " to replace disks on this node (%s)" %
3860
                                 (oth_node, tgt_node))
3861

    
3862
    # Step: create new storage
3863
    self.proc.LogStep(3, steps_total, "allocate new storage")
3864
    for dev in instance.disks:
3865
      if not dev.iv_name in self.op.disks:
3866
        continue
3867
      size = dev.size
3868
      cfg.SetDiskID(dev, tgt_node)
3869
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3870
      names = _GenerateUniqueNames(cfg, lv_names)
3871
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3872
                             logical_id=(vgname, names[0]))
3873
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3874
                             logical_id=(vgname, names[1]))
3875
      new_lvs = [lv_data, lv_meta]
3876
      old_lvs = dev.children
3877
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3878
      info("creating new local storage on %s for %s" %
3879
           (tgt_node, dev.iv_name))
3880
      # since we *always* want to create this LV, we use the
3881
      # _Create...OnPrimary (which forces the creation), even if we
3882
      # are talking about the secondary node
3883
      for new_lv in new_lvs:
3884
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3885
                                        _GetInstanceInfoText(instance)):
3886
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3887
                                   " node '%s'" %
3888
                                   (new_lv.logical_id[1], tgt_node))
3889

    
3890
    # Step: for each lv, detach+rename*2+attach
3891
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3892
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3893
      info("detaching %s drbd from local storage" % dev.iv_name)
3894
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3895
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3896
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3897
      #dev.children = []
3898
      #cfg.Update(instance)
3899

    
3900
      # ok, we created the new LVs, so now we know we have the needed
3901
      # storage; as such, we proceed on the target node to rename
3902
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3903
      # using the assumption that logical_id == physical_id (which in
3904
      # turn is the unique_id on that node)
3905

    
3906
      # FIXME(iustin): use a better name for the replaced LVs
3907
      temp_suffix = int(time.time())
3908
      ren_fn = lambda d, suff: (d.physical_id[0],
3909
                                d.physical_id[1] + "_replaced-%s" % suff)
3910
      # build the rename list based on what LVs exist on the node
3911
      rlist = []
3912
      for to_ren in old_lvs:
3913
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3914
        if find_res is not None: # device exists
3915
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3916

    
3917
      info("renaming the old LVs on the target node")
3918
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3919
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3920
      # now we rename the new LVs to the old LVs
3921
      info("renaming the new LVs on the target node")
3922
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3923
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3924
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3925

    
3926
      for old, new in zip(old_lvs, new_lvs):
3927
        new.logical_id = old.logical_id
3928
        cfg.SetDiskID(new, tgt_node)
3929

    
3930
      for disk in old_lvs:
3931
        disk.logical_id = ren_fn(disk, temp_suffix)
3932
        cfg.SetDiskID(disk, tgt_node)
3933

    
3934
      # now that the new lvs have the old name, we can add them to the device
3935
      info("adding new mirror component on %s" % tgt_node)
3936
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3937
        for new_lv in new_lvs:
3938
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3939
            warning("Can't rollback device %s", hint="manually cleanup unused"
3940
                    " logical volumes")
3941
        raise errors.OpExecError("Can't add local storage to drbd")
3942

    
3943
      dev.children = new_lvs
3944
      cfg.Update(instance)
3945

    
3946
    # Step: wait for sync
3947

    
3948
    # this can fail as the old devices are degraded and _WaitForSync
3949
    # does a combined result over all disks, so we don't check its
3950
    # return value
3951
    self.proc.LogStep(5, steps_total, "sync devices")
3952
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3953

    
3954
    # so check manually all the devices
3955
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3956
      cfg.SetDiskID(dev, instance.primary_node)
3957
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3958
      if is_degr:
3959
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3960

    
3961
    # Step: remove old storage
3962
    self.proc.LogStep(6, steps_total, "removing old storage")
3963
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3964
      info("remove logical volumes for %s" % name)
3965
      for lv in old_lvs:
3966
        cfg.SetDiskID(lv, tgt_node)
3967
        if not rpc.call_blockdev_remove(tgt_node, lv):
3968
          warning("Can't remove old LV", hint="manually remove unused LVs")
3969
          continue
3970

    
3971
  def _ExecD8Secondary(self, feedback_fn):
3972
    """Replace the secondary node for drbd8.
3973

3974
    The algorithm for replace is quite complicated:
3975
      - for all disks of the instance:
3976
        - create new LVs on the new node with same names
3977
        - shutdown the drbd device on the old secondary
3978
        - disconnect the drbd network on the primary
3979
        - create the drbd device on the new secondary
3980
        - network attach the drbd on the primary, using an artifice:
3981
          the drbd code for Attach() will connect to the network if it
3982
          finds a device which is connected to the good local disks but
3983
          not network enabled
3984
      - wait for sync across all devices
3985
      - remove all disks from the old secondary
3986

3987
    Failures are not very well handled.
3988

3989
    """
3990
    steps_total = 6
3991
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3992
    instance = self.instance
3993
    iv_names = {}
3994
    vgname = self.cfg.GetVGName()
3995
    # start of work
3996
    cfg = self.cfg
3997
    old_node = self.tgt_node
3998
    new_node = self.new_node
3999
    pri_node = instance.primary_node
4000

    
4001
    # Step: check device activation
4002
    self.proc.LogStep(1, steps_total, "check device existence")
4003
    info("checking volume groups")
4004
    my_vg = cfg.GetVGName()
4005
    results = rpc.call_vg_list([pri_node, new_node])
4006
    if not results:
4007
      raise errors.OpExecError("Can't list volume groups on the nodes")
4008
    for node in pri_node, new_node:
4009
      res = results.get(node, False)
4010
      if not res or my_vg not in res:
4011
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4012
                                 (my_vg, node))
4013
    for dev in instance.disks:
4014
      if not dev.iv_name in self.op.disks:
4015
        continue
4016
      info("checking %s on %s" % (dev.iv_name, pri_node))
4017
      cfg.SetDiskID(dev, pri_node)
4018
      if not rpc.call_blockdev_find(pri_node, dev):
4019
        raise errors.OpExecError("Can't find device %s on node %s" %
4020
                                 (dev.iv_name, pri_node))
4021

    
4022
    # Step: check other node consistency
4023
    self.proc.LogStep(2, steps_total, "check peer consistency")
4024
    for dev in instance.disks:
4025
      if not dev.iv_name in self.op.disks:
4026
        continue
4027
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4028
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4029
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4030
                                 " unsafe to replace the secondary" %
4031
                                 pri_node)
4032

    
4033
    # Step: create new storage
4034
    self.proc.LogStep(3, steps_total, "allocate new storage")
4035
    for dev in instance.disks:
4036
      size = dev.size
4037
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4038
      # since we *always* want to create this LV, we use the
4039
      # _Create...OnPrimary (which forces the creation), even if we
4040
      # are talking about the secondary node
4041
      for new_lv in dev.children:
4042
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4043
                                        _GetInstanceInfoText(instance)):
4044
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4045
                                   " node '%s'" %
4046
                                   (new_lv.logical_id[1], new_node))
4047

    
4048
      iv_names[dev.iv_name] = (dev, dev.children)
4049

    
4050
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4051
    for dev in instance.disks:
4052
      size = dev.size
4053
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4054
      # create new devices on new_node
4055
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4056
                              logical_id=(pri_node, new_node,
4057
                                          dev.logical_id[2]),
4058
                              children=dev.children)
4059
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4060
                                        new_drbd, False,
4061
                                      _GetInstanceInfoText(instance)):
4062
        raise errors.OpExecError("Failed to create new DRBD on"
4063
                                 " node '%s'" % new_node)
4064

    
4065
    for dev in instance.disks:
4066
      # we have new devices, shutdown the drbd on the old secondary
4067
      info("shutting down drbd for %s on old node" % dev.iv_name)
4068
      cfg.SetDiskID(dev, old_node)
4069
      if not rpc.call_blockdev_shutdown(old_node, dev):
4070
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4071
                hint="Please cleanup this device manually as soon as possible")
4072

    
4073
    info("detaching primary drbds from the network (=> standalone)")
4074
    done = 0
4075
    for dev in instance.disks:
4076
      cfg.SetDiskID(dev, pri_node)
4077
      # set the physical (unique in bdev terms) id to None, meaning
4078
      # detach from network
4079
      dev.physical_id = (None,) * len(dev.physical_id)
4080
      # and 'find' the device, which will 'fix' it to match the
4081
      # standalone state
4082
      if rpc.call_blockdev_find(pri_node, dev):
4083
        done += 1
4084
      else:
4085
        warning("Failed to detach drbd %s from network, unusual case" %
4086
                dev.iv_name)
4087

    
4088
    if not done:
4089
      # no detaches succeeded (very unlikely)
4090
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4091

    
4092
    # if we managed to detach at least one, we update all the disks of
4093
    # the instance to point to the new secondary
4094
    info("updating instance configuration")
4095
    for dev in instance.disks:
4096
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4097
      cfg.SetDiskID(dev, pri_node)
4098
    cfg.Update(instance)
4099

    
4100
    # and now perform the drbd attach
4101
    info("attaching primary drbds to new secondary (standalone => connected)")
4102
    failures = []
4103
    for dev in instance.disks:
4104
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4105
      # since the attach is smart, it's enough to 'find' the device,
4106
      # it will automatically activate the network, if the physical_id
4107
      # is correct
4108
      cfg.SetDiskID(dev, pri_node)
4109
      if not rpc.call_blockdev_find(pri_node, dev):
4110
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4111
                "please do a gnt-instance info to see the status of disks")
4112

    
4113
    # this can fail as the old devices are degraded and _WaitForSync
4114
    # does a combined result over all disks, so we don't check its
4115
    # return value
4116
    self.proc.LogStep(5, steps_total, "sync devices")
4117
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4118

    
4119
    # so check manually all the devices
4120
    for name, (dev, old_lvs) in iv_names.iteritems():
4121
      cfg.SetDiskID(dev, pri_node)
4122
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4123
      if is_degr:
4124
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4125

    
4126
    self.proc.LogStep(6, steps_total, "removing old storage")
4127
    for name, (dev, old_lvs) in iv_names.iteritems():
4128
      info("remove logical volumes for %s" % name)
4129
      for lv in old_lvs:
4130
        cfg.SetDiskID(lv, old_node)
4131
        if not rpc.call_blockdev_remove(old_node, lv):
4132
          warning("Can't remove LV on old secondary",
4133
                  hint="Cleanup stale volumes by hand")
4134

    
4135
  def Exec(self, feedback_fn):
4136
    """Execute disk replacement.
4137

4138
    This dispatches the disk replacement to the appropriate handler.
4139

4140
    """
4141
    instance = self.instance
4142
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4143
      fn = self._ExecRR1
4144
    elif instance.disk_template == constants.DT_DRBD8:
4145
      if self.op.remote_node is None:
4146
        fn = self._ExecD8DiskOnly
4147
      else:
4148
        fn = self._ExecD8Secondary
4149
    else:
4150
      raise errors.ProgrammerError("Unhandled disk replacement case")
4151
    return fn(feedback_fn)
4152

    
4153

    
4154
class LUQueryInstanceData(NoHooksLU):
4155
  """Query runtime instance data.
4156

4157
  """
4158
  _OP_REQP = ["instances"]
4159

    
4160
  def CheckPrereq(self):
4161
    """Check prerequisites.
4162

4163
    This only checks the optional instance list against the existing names.
4164

4165
    """
4166
    if not isinstance(self.op.instances, list):
4167
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4168
    if self.op.instances:
4169
      self.wanted_instances = []
4170
      names = self.op.instances
4171
      for name in names:
4172
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4173
        if instance is None:
4174
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4175
        self.wanted_instances.append(instance)
4176
    else:
4177
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4178
                               in self.cfg.GetInstanceList()]
4179
    return
4180

    
4181

    
4182
  def _ComputeDiskStatus(self, instance, snode, dev):
4183
    """Compute block device status.
4184

4185
    """
4186
    self.cfg.SetDiskID(dev, instance.primary_node)
4187
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4188
    if dev.dev_type in constants.LDS_DRBD:
4189
      # we change the snode then (otherwise we use the one passed in)
4190
      if dev.logical_id[0] == instance.primary_node:
4191
        snode = dev.logical_id[1]
4192
      else:
4193
        snode = dev.logical_id[0]
4194

    
4195
    if snode:
4196
      self.cfg.SetDiskID(dev, snode)
4197
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4198
    else:
4199
      dev_sstatus = None
4200

    
4201
    if dev.children:
4202
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4203
                      for child in dev.children]
4204
    else:
4205
      dev_children = []
4206

    
4207
    data = {
4208
      "iv_name": dev.iv_name,
4209
      "dev_type": dev.dev_type,
4210
      "logical_id": dev.logical_id,
4211
      "physical_id": dev.physical_id,
4212
      "pstatus": dev_pstatus,
4213
      "sstatus": dev_sstatus,
4214
      "children": dev_children,
4215
      }
4216

    
4217
    return data
4218

    
4219
  def Exec(self, feedback_fn):
4220
    """Gather and return data"""
4221
    result = {}
4222
    for instance in self.wanted_instances:
4223
      remote_info = rpc.call_instance_info(instance.primary_node,
4224
                                                instance.name)
4225
      if remote_info and "state" in remote_info:
4226
        remote_state = "up"
4227
      else:
4228
        remote_state = "down"
4229
      if instance.status == "down":
4230
        config_state = "down"
4231
      else:
4232
        config_state = "up"
4233

    
4234
      disks = [self._ComputeDiskStatus(instance, None, device)
4235
               for device in instance.disks]
4236

    
4237
      idict = {
4238
        "name": instance.name,
4239
        "config_state": config_state,
4240
        "run_state": remote_state,
4241
        "pnode": instance.primary_node,
4242
        "snodes": instance.secondary_nodes,
4243
        "os": instance.os,
4244
        "memory": instance.memory,
4245
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4246
        "disks": disks,
4247
        "network_port": instance.network_port,
4248
        "vcpus": instance.vcpus,
4249
        "kernel_path": instance.kernel_path,
4250
        "initrd_path": instance.initrd_path,
4251
        "hvm_boot_order": instance.hvm_boot_order,
4252
        }
4253

    
4254
      result[instance.name] = idict
4255

    
4256
    return result
4257

    
4258

    
4259
class LUSetInstanceParams(LogicalUnit):
4260
  """Modifies an instances's parameters.
4261

4262
  """
4263
  HPATH = "instance-modify"
4264
  HTYPE = constants.HTYPE_INSTANCE
4265
  _OP_REQP = ["instance_name"]
4266

    
4267
  def BuildHooksEnv(self):
4268
    """Build hooks env.
4269

4270
    This runs on the master, primary and secondaries.
4271

4272
    """
4273
    args = dict()
4274
    if self.mem:
4275
      args['memory'] = self.mem
4276
    if self.vcpus:
4277
      args['vcpus'] = self.vcpus
4278
    if self.do_ip or self.do_bridge or self.mac:
4279
      if self.do_ip:
4280
        ip = self.ip
4281
      else:
4282
        ip = self.instance.nics[0].ip
4283
      if self.bridge:
4284
        bridge = self.bridge
4285
      else:
4286
        bridge = self.instance.nics[0].bridge
4287
      if self.mac:
4288
        mac = self.mac
4289
      else:
4290
        mac = self.instance.nics[0].mac
4291
      args['nics'] = [(ip, bridge, mac)]
4292
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4293
    nl = [self.sstore.GetMasterNode(),
4294
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4295
    return env, nl, nl
4296

    
4297
  def CheckPrereq(self):
4298
    """Check prerequisites.
4299

4300
    This only checks the instance list against the existing names.
4301

4302
    """
4303
    self.mem = getattr(self.op, "mem", None)
4304
    self.vcpus = getattr(self.op, "vcpus", None)
4305
    self.ip = getattr(self.op, "ip", None)
4306
    self.mac = getattr(self.op, "mac", None)
4307
    self.bridge = getattr(self.op, "bridge", None)
4308
    self.kernel_path = getattr(self.op, "kernel_path", None)
4309
    self.initrd_path = getattr(self.op, "initrd_path", None)
4310
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4311
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4312
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4313
    if all_params.count(None) == len(all_params):
4314
      raise errors.OpPrereqError("No changes submitted")
4315
    if self.mem is not None:
4316
      try:
4317
        self.mem = int(self.mem)
4318
      except ValueError, err:
4319
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4320
    if self.vcpus is not None:
4321
      try:
4322
        self.vcpus = int(self.vcpus)
4323
      except ValueError, err:
4324
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4325
    if self.ip is not None:
4326
      self.do_ip = True
4327
      if self.ip.lower() == "none":
4328
        self.ip = None
4329
      else:
4330
        if not utils.IsValidIP(self.ip):
4331
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4332
    else:
4333
      self.do_ip = False
4334
    self.do_bridge = (self.bridge is not None)
4335
    if self.mac is not None:
4336
      if self.cfg.IsMacInUse(self.mac):
4337
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4338
                                   self.mac)
4339
      if not utils.IsValidMac(self.mac):
4340
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4341

    
4342
    if self.kernel_path is not None:
4343
      self.do_kernel_path = True
4344
      if self.kernel_path == constants.VALUE_NONE:
4345
        raise errors.OpPrereqError("Can't set instance to no kernel")
4346

    
4347
      if self.kernel_path != constants.VALUE_DEFAULT:
4348
        if not os.path.isabs(self.kernel_path):
4349
          raise errors.OpPrereqError("The kernel path must be an absolute"
4350
                                    " filename")
4351
    else:
4352
      self.do_kernel_path = False
4353

    
4354
    if self.initrd_path is not None:
4355
      self.do_initrd_path = True
4356
      if self.initrd_path not in (constants.VALUE_NONE,
4357
                                  constants.VALUE_DEFAULT):
4358
        if not os.path.isabs(self.initrd_path):
4359
          raise errors.OpPrereqError("The initrd path must be an absolute"
4360
                                    " filename")
4361
    else:
4362
      self.do_initrd_path = False
4363

    
4364
    # boot order verification
4365
    if self.hvm_boot_order is not None:
4366
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4367
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4368
          raise errors.OpPrereqError("invalid boot order specified,"
4369
                                     " must be one or more of [acdn]"
4370
                                     " or 'default'")
4371

    
4372
    instance = self.cfg.GetInstanceInfo(
4373
      self.cfg.ExpandInstanceName(self.op.instance_name))
4374
    if instance is None:
4375
      raise errors.OpPrereqError("No such instance name '%s'" %
4376
                                 self.op.instance_name)
4377
    self.op.instance_name = instance.name
4378
    self.instance = instance
4379
    return
4380

    
4381
  def Exec(self, feedback_fn):
4382
    """Modifies an instance.
4383

4384
    All parameters take effect only at the next restart of the instance.
4385
    """
4386
    result = []
4387
    instance = self.instance
4388
    if self.mem:
4389
      instance.memory = self.mem
4390
      result.append(("mem", self.mem))
4391
    if self.vcpus:
4392
      instance.vcpus = self.vcpus
4393
      result.append(("vcpus",  self.vcpus))
4394
    if self.do_ip:
4395
      instance.nics[0].ip = self.ip
4396
      result.append(("ip", self.ip))
4397
    if self.bridge:
4398
      instance.nics[0].bridge = self.bridge
4399
      result.append(("bridge", self.bridge))
4400
    if self.mac:
4401
      instance.nics[0].mac = self.mac
4402
      result.append(("mac", self.mac))
4403
    if self.do_kernel_path:
4404
      instance.kernel_path = self.kernel_path
4405
      result.append(("kernel_path", self.kernel_path))
4406
    if self.do_initrd_path:
4407
      instance.initrd_path = self.initrd_path
4408
      result.append(("initrd_path", self.initrd_path))
4409
    if self.hvm_boot_order:
4410
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4411
        instance.hvm_boot_order = None
4412
      else:
4413
        instance.hvm_boot_order = self.hvm_boot_order
4414
      result.append(("hvm_boot_order", self.hvm_boot_order))
4415

    
4416
    self.cfg.AddInstance(instance)
4417

    
4418
    return result
4419

    
4420

    
4421
class LUQueryExports(NoHooksLU):
4422
  """Query the exports list
4423

4424
  """
4425
  _OP_REQP = []
4426

    
4427
  def CheckPrereq(self):
4428
    """Check that the nodelist contains only existing nodes.
4429

4430
    """
4431
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4432

    
4433
  def Exec(self, feedback_fn):
4434
    """Compute the list of all the exported system images.
4435

4436
    Returns:
4437
      a dictionary with the structure node->(export-list)
4438
      where export-list is a list of the instances exported on
4439
      that node.
4440

4441
    """
4442
    return rpc.call_export_list(self.nodes)
4443

    
4444

    
4445
class LUExportInstance(LogicalUnit):
4446
  """Export an instance to an image in the cluster.
4447

4448
  """
4449
  HPATH = "instance-export"
4450
  HTYPE = constants.HTYPE_INSTANCE
4451
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4452

    
4453
  def BuildHooksEnv(self):
4454
    """Build hooks env.
4455

4456
    This will run on the master, primary node and target node.
4457

4458
    """
4459
    env = {
4460
      "EXPORT_NODE": self.op.target_node,
4461
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4462
      }
4463
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4464
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4465
          self.op.target_node]
4466
    return env, nl, nl
4467

    
4468
  def CheckPrereq(self):
4469
    """Check prerequisites.
4470

4471
    This checks that the instance and node names are valid.
4472

4473
    """
4474
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4475
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4476
    if self.instance is None:
4477
      raise errors.OpPrereqError("Instance '%s' not found" %
4478
                                 self.op.instance_name)
4479

    
4480
    # node verification
4481
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4482
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4483

    
4484
    if self.dst_node is None:
4485
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4486
                                 self.op.target_node)
4487
    self.op.target_node = self.dst_node.name
4488

    
4489
    # instance disk type verification
4490
    for disk in self.instance.disks:
4491
      if disk.dev_type == constants.LD_FILE:
4492
        raise errors.OpPrereqError("Export not supported for instances with"
4493
                                   " file-based disks")
4494

    
4495
  def Exec(self, feedback_fn):
4496
    """Export an instance to an image in the cluster.
4497

4498
    """
4499
    instance = self.instance
4500
    dst_node = self.dst_node
4501
    src_node = instance.primary_node
4502
    if self.op.shutdown:
4503
      # shutdown the instance, but not the disks
4504
      if not rpc.call_instance_shutdown(src_node, instance):
4505
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4506
                                  (instance.name, src_node))
4507

    
4508
    vgname = self.cfg.GetVGName()
4509

    
4510
    snap_disks = []
4511

    
4512
    try:
4513
      for disk in instance.disks:
4514
        if disk.iv_name == "sda":
4515
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4516
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4517

    
4518
          if not new_dev_name:
4519
            logger.Error("could not snapshot block device %s on node %s" %
4520
                         (disk.logical_id[1], src_node))
4521
          else:
4522
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4523
                                      logical_id=(vgname, new_dev_name),
4524
                                      physical_id=(vgname, new_dev_name),
4525
                                      iv_name=disk.iv_name)
4526
            snap_disks.append(new_dev)
4527

    
4528
    finally:
4529
      if self.op.shutdown and instance.status == "up":
4530
        if not rpc.call_instance_start(src_node, instance, None):
4531
          _ShutdownInstanceDisks(instance, self.cfg)
4532
          raise errors.OpExecError("Could not start instance")
4533

    
4534
    # TODO: check for size
4535

    
4536
    for dev in snap_disks:
4537
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4538
        logger.Error("could not export block device %s from node %s to node %s"
4539
                     % (dev.logical_id[1], src_node, dst_node.name))
4540
      if not rpc.call_blockdev_remove(src_node, dev):
4541
        logger.Error("could not remove snapshot block device %s from node %s" %
4542
                     (dev.logical_id[1], src_node))
4543

    
4544
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4545
      logger.Error("could not finalize export for instance %s on node %s" %
4546
                   (instance.name, dst_node.name))
4547

    
4548
    nodelist = self.cfg.GetNodeList()
4549
    nodelist.remove(dst_node.name)
4550

    
4551
    # on one-node clusters nodelist will be empty after the removal
4552
    # if we proceed the backup would be removed because OpQueryExports
4553
    # substitutes an empty list with the full cluster node list.
4554
    if nodelist:
4555
      op = opcodes.OpQueryExports(nodes=nodelist)
4556
      exportlist = self.proc.ChainOpCode(op)
4557
      for node in exportlist:
4558
        if instance.name in exportlist[node]:
4559
          if not rpc.call_export_remove(node, instance.name):
4560
            logger.Error("could not remove older export for instance %s"
4561
                         " on node %s" % (instance.name, node))
4562

    
4563

    
4564
class LURemoveExport(NoHooksLU):
4565
  """Remove exports related to the named instance.
4566

4567
  """
4568
  _OP_REQP = ["instance_name"]
4569

    
4570
  def CheckPrereq(self):
4571
    """Check prerequisites.
4572
    """
4573
    pass
4574

    
4575
  def Exec(self, feedback_fn):
4576
    """Remove any export.
4577

4578
    """
4579
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4580
    # If the instance was not found we'll try with the name that was passed in.
4581
    # This will only work if it was an FQDN, though.
4582
    fqdn_warn = False
4583
    if not instance_name:
4584
      fqdn_warn = True
4585
      instance_name = self.op.instance_name
4586

    
4587
    op = opcodes.OpQueryExports(nodes=[])
4588
    exportlist = self.proc.ChainOpCode(op)
4589
    found = False
4590
    for node in exportlist:
4591
      if instance_name in exportlist[node]:
4592
        found = True
4593
        if not rpc.call_export_remove(node, instance_name):
4594
          logger.Error("could not remove export for instance %s"
4595
                       " on node %s" % (instance_name, node))
4596

    
4597
    if fqdn_warn and not found:
4598
      feedback_fn("Export not found. If trying to remove an export belonging"
4599
                  " to a deleted instance please use its Fully Qualified"
4600
                  " Domain Name.")
4601

    
4602

    
4603
class TagsLU(NoHooksLU):
4604
  """Generic tags LU.
4605

4606
  This is an abstract class which is the parent of all the other tags LUs.
4607

4608
  """
4609
  def CheckPrereq(self):
4610
    """Check prerequisites.
4611

4612
    """
4613
    if self.op.kind == constants.TAG_CLUSTER:
4614
      self.target = self.cfg.GetClusterInfo()
4615
    elif self.op.kind == constants.TAG_NODE:
4616
      name = self.cfg.ExpandNodeName(self.op.name)
4617
      if name is None:
4618
        raise errors.OpPrereqError("Invalid node name (%s)" %
4619
                                   (self.op.name,))
4620
      self.op.name = name
4621
      self.target = self.cfg.GetNodeInfo(name)
4622
    elif self.op.kind == constants.TAG_INSTANCE:
4623
      name = self.cfg.ExpandInstanceName(self.op.name)
4624
      if name is None:
4625
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4626
                                   (self.op.name,))
4627
      self.op.name = name
4628
      self.target = self.cfg.GetInstanceInfo(name)
4629
    else:
4630
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4631
                                 str(self.op.kind))
4632

    
4633

    
4634
class LUGetTags(TagsLU):
4635
  """Returns the tags of a given object.
4636

4637
  """
4638
  _OP_REQP = ["kind", "name"]
4639

    
4640
  def Exec(self, feedback_fn):
4641
    """Returns the tag list.
4642

4643
    """
4644
    return self.target.GetTags()
4645

    
4646

    
4647
class LUSearchTags(NoHooksLU):
4648
  """Searches the tags for a given pattern.
4649

4650
  """
4651
  _OP_REQP = ["pattern"]
4652

    
4653
  def CheckPrereq(self):
4654
    """Check prerequisites.
4655

4656
    This checks the pattern passed for validity by compiling it.
4657

4658
    """
4659
    try:
4660
      self.re = re.compile(self.op.pattern)
4661
    except re.error, err:
4662
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4663
                                 (self.op.pattern, err))
4664

    
4665
  def Exec(self, feedback_fn):
4666
    """Returns the tag list.
4667

4668
    """
4669
    cfg = self.cfg
4670
    tgts = [("/cluster", cfg.GetClusterInfo())]
4671
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4672
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4673
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4674
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4675
    results = []
4676
    for path, target in tgts:
4677
      for tag in target.GetTags():
4678
        if self.re.search(tag):
4679
          results.append((path, tag))
4680
    return results
4681

    
4682

    
4683
class LUAddTags(TagsLU):
4684
  """Sets a tag on a given object.
4685

4686
  """
4687
  _OP_REQP = ["kind", "name", "tags"]
4688

    
4689
  def CheckPrereq(self):
4690
    """Check prerequisites.
4691

4692
    This checks the type and length of the tag name and value.
4693

4694
    """
4695
    TagsLU.CheckPrereq(self)
4696
    for tag in self.op.tags:
4697
      objects.TaggableObject.ValidateTag(tag)
4698

    
4699
  def Exec(self, feedback_fn):
4700
    """Sets the tag.
4701

4702
    """
4703
    try:
4704
      for tag in self.op.tags:
4705
        self.target.AddTag(tag)
4706
    except errors.TagError, err:
4707
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4708
    try:
4709
      self.cfg.Update(self.target)
4710
    except errors.ConfigurationError:
4711
      raise errors.OpRetryError("There has been a modification to the"
4712
                                " config file and the operation has been"
4713
                                " aborted. Please retry.")
4714

    
4715

    
4716
class LUDelTags(TagsLU):
4717
  """Delete a list of tags from a given object.
4718

4719
  """
4720
  _OP_REQP = ["kind", "name", "tags"]
4721

    
4722
  def CheckPrereq(self):
4723
    """Check prerequisites.
4724

4725
    This checks that we have the given tag.
4726

4727
    """
4728
    TagsLU.CheckPrereq(self)
4729
    for tag in self.op.tags:
4730
      objects.TaggableObject.ValidateTag(tag)
4731
    del_tags = frozenset(self.op.tags)
4732
    cur_tags = self.target.GetTags()
4733
    if not del_tags <= cur_tags:
4734
      diff_tags = del_tags - cur_tags
4735
      diff_names = ["'%s'" % tag for tag in diff_tags]
4736
      diff_names.sort()
4737
      raise errors.OpPrereqError("Tag(s) %s not found" %
4738
                                 (",".join(diff_names)))
4739

    
4740
  def Exec(self, feedback_fn):
4741
    """Remove the tag from the object.
4742

4743
    """
4744
    for tag in self.op.tags:
4745
      self.target.RemoveTag(tag)
4746
    try:
4747
      self.cfg.Update(self.target)
4748
    except errors.ConfigurationError:
4749
      raise errors.OpRetryError("There has been a modification to the"
4750
                                " config file and the operation has been"
4751
                                " aborted. Please retry.")
4752

    
4753
class LUTestDelay(NoHooksLU):
4754
  """Sleep for a specified amount of time.
4755

4756
  This LU sleeps on the master and/or nodes for a specified amoutn of
4757
  time.
4758

4759
  """
4760
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4761

    
4762
  def CheckPrereq(self):
4763
    """Check prerequisites.
4764

4765
    This checks that we have a good list of nodes and/or the duration
4766
    is valid.
4767

4768
    """
4769

    
4770
    if self.op.on_nodes:
4771
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4772

    
4773
  def Exec(self, feedback_fn):
4774
    """Do the actual sleep.
4775

4776
    """
4777
    if self.op.on_master:
4778
      if not utils.TestDelay(self.op.duration):
4779
        raise errors.OpExecError("Error during master delay test")
4780
    if self.op.on_nodes:
4781
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4782
      if not result:
4783
        raise errors.OpExecError("Complete failure from rpc call")
4784
      for node, node_result in result.items():
4785
        if not node_result:
4786
          raise errors.OpExecError("Failure during rpc call to node %s,"
4787
                                   " result: %s" % (node, node_result))
4788

    
4789

    
4790
class IAllocator(object):
4791
  """IAllocator framework.
4792

4793
  An IAllocator instance has three sets of attributes:
4794
    - cfg/sstore that are needed to query the cluster
4795
    - input data (all members of the _KEYS class attribute are required)
4796
    - four buffer attributes (in|out_data|text), that represent the
4797
      input (to the external script) in text and data structure format,
4798
      and the output from it, again in two formats
4799
    - the result variables from the script (success, info, nodes) for
4800
      easy usage
4801

4802
  """
4803
  _ALLO_KEYS = [
4804
    "mem_size", "disks", "disk_template",
4805
    "os", "tags", "nics", "vcpus",
4806
    ]
4807
  _RELO_KEYS = [
4808
    "relocate_from",
4809
    ]
4810

    
4811
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4812
    self.cfg = cfg
4813
    self.sstore = sstore
4814
    # init buffer variables
4815
    self.in_text = self.out_text = self.in_data = self.out_data = None
4816
    # init all input fields so that pylint is happy
4817
    self.mode = mode
4818
    self.name = name
4819
    self.mem_size = self.disks = self.disk_template = None
4820
    self.os = self.tags = self.nics = self.vcpus = None
4821
    self.relocate_from = None
4822
    # computed fields
4823
    self.required_nodes = None
4824
    # init result fields
4825
    self.success = self.info = self.nodes = None
4826
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4827
      keyset = self._ALLO_KEYS
4828
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4829
      keyset = self._RELO_KEYS
4830
    else:
4831
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4832
                                   " IAllocator" % self.mode)
4833
    for key in kwargs:
4834
      if key not in keyset:
4835
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4836
                                     " IAllocator" % key)
4837
      setattr(self, key, kwargs[key])
4838
    for key in keyset:
4839
      if key not in kwargs:
4840
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4841
                                     " IAllocator" % key)
4842
    self._BuildInputData()
4843

    
4844
  def _ComputeClusterData(self):
4845
    """Compute the generic allocator input data.
4846

4847
    This is the data that is independent of the actual operation.
4848

4849
    """
4850
    cfg = self.cfg
4851
    # cluster data
4852
    data = {
4853
      "version": 1,
4854
      "cluster_name": self.sstore.GetClusterName(),
4855
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4856
      "hypervisor_type": self.sstore.GetHypervisorType(),
4857
      # we don't have job IDs
4858
      }
4859

    
4860
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4861

    
4862
    # node data
4863
    node_results = {}
4864
    node_list = cfg.GetNodeList()
4865
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4866
    for nname in node_list:
4867
      ninfo = cfg.GetNodeInfo(nname)
4868
      if nname not in node_data or not isinstance(node_data[nname], dict):
4869
        raise errors.OpExecError("Can't get data for node %s" % nname)
4870
      remote_info = node_data[nname]
4871
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4872
                   'vg_size', 'vg_free']:
4873
        if attr not in remote_info:
4874
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4875
                                   (nname, attr))
4876
        try:
4877
          remote_info[attr] = int(remote_info[attr])
4878
        except ValueError, err:
4879
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4880
                                   " %s" % (nname, attr, str(err)))
4881
      # compute memory used by primary instances
4882
      i_p_mem = i_p_up_mem = 0
4883
      for iinfo in i_list:
4884
        if iinfo.primary_node == nname:
4885
          i_p_mem += iinfo.memory
4886
          if iinfo.status == "up":
4887
            i_p_up_mem += iinfo.memory
4888

    
4889
      # compute memory used by instances
4890
      pnr = {
4891
        "tags": list(ninfo.GetTags()),
4892
        "total_memory": remote_info['memory_total'],
4893
        "reserved_memory": remote_info['memory_dom0'],
4894
        "free_memory": remote_info['memory_free'],
4895
        "i_pri_memory": i_p_mem,
4896
        "i_pri_up_memory": i_p_up_mem,
4897
        "total_disk": remote_info['vg_size'],
4898
        "free_disk": remote_info['vg_free'],
4899
        "primary_ip": ninfo.primary_ip,
4900
        "secondary_ip": ninfo.secondary_ip,
4901
        }
4902
      node_results[nname] = pnr
4903
    data["nodes"] = node_results
4904

    
4905
    # instance data
4906
    instance_data = {}
4907
    for iinfo in i_list:
4908
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4909
                  for n in iinfo.nics]
4910
      pir = {
4911
        "tags": list(iinfo.GetTags()),
4912
        "should_run": iinfo.status == "up",
4913
        "vcpus": iinfo.vcpus,
4914
        "memory": iinfo.memory,
4915
        "os": iinfo.os,
4916
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4917
        "nics": nic_data,
4918
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4919
        "disk_template": iinfo.disk_template,
4920
        }
4921
      instance_data[iinfo.name] = pir
4922

    
4923
    data["instances"] = instance_data
4924

    
4925
    self.in_data = data
4926

    
4927
  def _AddNewInstance(self):
4928
    """Add new instance data to allocator structure.
4929

4930
    This in combination with _AllocatorGetClusterData will create the
4931
    correct structure needed as input for the allocator.
4932

4933
    The checks for the completeness of the opcode must have already been
4934
    done.
4935

4936
    """
4937
    data = self.in_data
4938
    if len(self.disks) != 2:
4939
      raise errors.OpExecError("Only two-disk configurations supported")
4940

    
4941
    disk_space = _ComputeDiskSize(self.disk_template,
4942
                                  self.disks[0]["size"], self.disks[1]["size"])
4943

    
4944
    if self.disk_template in constants.DTS_NET_MIRROR:
4945
      self.required_nodes = 2
4946
    else:
4947
      self.required_nodes = 1
4948
    request = {
4949
      "type": "allocate",
4950
      "name": self.name,
4951
      "disk_template": self.disk_template,
4952
      "tags": self.tags,
4953
      "os": self.os,
4954
      "vcpus": self.vcpus,
4955
      "memory": self.mem_size,
4956
      "disks": self.disks,
4957
      "disk_space_total": disk_space,
4958
      "nics": self.nics,
4959
      "required_nodes": self.required_nodes,
4960
      }
4961
    data["request"] = request
4962

    
4963
  def _AddRelocateInstance(self):
4964
    """Add relocate instance data to allocator structure.
4965

4966
    This in combination with _IAllocatorGetClusterData will create the
4967
    correct structure needed as input for the allocator.
4968

4969
    The checks for the completeness of the opcode must have already been
4970
    done.
4971

4972
    """
4973
    instance = self.cfg.GetInstanceInfo(self.name)
4974
    if instance is None:
4975
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4976
                                   " IAllocator" % self.name)
4977

    
4978
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4979
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4980

    
4981
    if len(instance.secondary_nodes) != 1:
4982
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4983

    
4984
    self.required_nodes = 1
4985

    
4986
    disk_space = _ComputeDiskSize(instance.disk_template,
4987
                                  instance.disks[0].size,
4988
                                  instance.disks[1].size)
4989

    
4990
    request = {
4991
      "type": "relocate",
4992
      "name": self.name,
4993
      "disk_space_total": disk_space,
4994
      "required_nodes": self.required_nodes,
4995
      "relocate_from": self.relocate_from,
4996
      }
4997
    self.in_data["request"] = request
4998

    
4999
  def _BuildInputData(self):
5000
    """Build input data structures.
5001

5002
    """
5003
    self._ComputeClusterData()
5004

    
5005
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5006
      self._AddNewInstance()
5007
    else:
5008
      self._AddRelocateInstance()
5009

    
5010
    self.in_text = serializer.Dump(self.in_data)
5011

    
5012
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5013
    """Run an instance allocator and return the results.
5014

5015
    """
5016
    data = self.in_text
5017

    
5018
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5019

    
5020
    if not isinstance(result, tuple) or len(result) != 4:
5021
      raise errors.OpExecError("Invalid result from master iallocator runner")
5022

    
5023
    rcode, stdout, stderr, fail = result
5024

    
5025
    if rcode == constants.IARUN_NOTFOUND:
5026
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5027
    elif rcode == constants.IARUN_FAILURE:
5028
        raise errors.OpExecError("Instance allocator call failed: %s,"
5029
                                 " output: %s" %
5030
                                 (fail, stdout+stderr))
5031
    self.out_text = stdout
5032
    if validate:
5033
      self._ValidateResult()
5034

    
5035
  def _ValidateResult(self):
5036
    """Process the allocator results.
5037

5038
    This will process and if successful save the result in
5039
    self.out_data and the other parameters.
5040

5041
    """
5042
    try:
5043
      rdict = serializer.Load(self.out_text)
5044
    except Exception, err:
5045
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5046

    
5047
    if not isinstance(rdict, dict):
5048
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5049

    
5050
    for key in "success", "info", "nodes":
5051
      if key not in rdict:
5052
        raise errors.OpExecError("Can't parse iallocator results:"
5053
                                 " missing key '%s'" % key)
5054
      setattr(self, key, rdict[key])
5055

    
5056
    if not isinstance(rdict["nodes"], list):
5057
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5058
                               " is not a list")
5059
    self.out_data = rdict
5060

    
5061

    
5062
class LUTestAllocator(NoHooksLU):
5063
  """Run allocator tests.
5064

5065
  This LU runs the allocator tests
5066

5067
  """
5068
  _OP_REQP = ["direction", "mode", "name"]
5069

    
5070
  def CheckPrereq(self):
5071
    """Check prerequisites.
5072

5073
    This checks the opcode parameters depending on the director and mode test.
5074

5075
    """
5076
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5077
      for attr in ["name", "mem_size", "disks", "disk_template",
5078
                   "os", "tags", "nics", "vcpus"]:
5079
        if not hasattr(self.op, attr):
5080
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5081
                                     attr)
5082
      iname = self.cfg.ExpandInstanceName(self.op.name)
5083
      if iname is not None:
5084
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5085
                                   iname)
5086
      if not isinstance(self.op.nics, list):
5087
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5088
      for row in self.op.nics:
5089
        if (not isinstance(row, dict) or
5090
            "mac" not in row or
5091
            "ip" not in row or
5092
            "bridge" not in row):
5093
          raise errors.OpPrereqError("Invalid contents of the"
5094
                                     " 'nics' parameter")
5095
      if not isinstance(self.op.disks, list):
5096
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5097
      if len(self.op.disks) != 2:
5098
        raise errors.OpPrereqError("Only two-disk configurations supported")
5099
      for row in self.op.disks:
5100
        if (not isinstance(row, dict) or
5101
            "size" not in row or
5102
            not isinstance(row["size"], int) or
5103
            "mode" not in row or
5104
            row["mode"] not in ['r', 'w']):
5105
          raise errors.OpPrereqError("Invalid contents of the"
5106
                                     " 'disks' parameter")
5107
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5108
      if not hasattr(self.op, "name"):
5109
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5110
      fname = self.cfg.ExpandInstanceName(self.op.name)
5111
      if fname is None:
5112
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5113
                                   self.op.name)
5114
      self.op.name = fname
5115
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5116
    else:
5117
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5118
                                 self.op.mode)
5119

    
5120
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5121
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5122
        raise errors.OpPrereqError("Missing allocator name")
5123
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5124
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5125
                                 self.op.direction)
5126

    
5127
  def Exec(self, feedback_fn):
5128
    """Run the allocator test.
5129

5130
    """
5131
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5132
      ial = IAllocator(self.cfg, self.sstore,
5133
                       mode=self.op.mode,
5134
                       name=self.op.name,
5135
                       mem_size=self.op.mem_size,
5136
                       disks=self.op.disks,
5137
                       disk_template=self.op.disk_template,
5138
                       os=self.op.os,
5139
                       tags=self.op.tags,
5140
                       nics=self.op.nics,
5141
                       vcpus=self.op.vcpus,
5142
                       )
5143
    else:
5144
      ial = IAllocator(self.cfg, self.sstore,
5145
                       mode=self.op.mode,
5146
                       name=self.op.name,
5147
                       relocate_from=list(self.relocate_from),
5148
                       )
5149

    
5150
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5151
      result = ial.in_text
5152
    else:
5153
      ial.Run(self.op.allocator, validate=False)
5154
      result = ial.out_text
5155
    return result