Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9ac99fda

History | View | Annotate | Download (171.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    As for the node lists, the master should not be included in the
149
    them, as it will be added by the hooks runner in case this LU
150
    requires a cluster to run on (otherwise we don't have a node
151
    list). No nodes should be returned as an empty list (and not
152
    None).
153

154
    Note that if the HPATH for a LU class is None, this function will
155
    not be called.
156

157
    """
158
    raise NotImplementedError
159

    
160

    
161
class NoHooksLU(LogicalUnit):
162
  """Simple LU which runs no hooks.
163

164
  This LU is intended as a parent for other LogicalUnits which will
165
  run no hooks, in order to reduce duplicate code.
166

167
  """
168
  HPATH = None
169
  HTYPE = None
170

    
171
  def BuildHooksEnv(self):
172
    """Build hooks env.
173

174
    This is a no-op, since we don't run hooks.
175

176
    """
177
    return {}, [], []
178

    
179

    
180
def _AddHostToEtcHosts(hostname):
181
  """Wrapper around utils.SetEtcHostsEntry.
182

183
  """
184
  hi = utils.HostInfo(name=hostname)
185
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
186

    
187

    
188
def _RemoveHostFromEtcHosts(hostname):
189
  """Wrapper around utils.RemoveEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
194
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
195

    
196

    
197
def _GetWantedNodes(lu, nodes):
198
  """Returns list of checked and expanded node names.
199

200
  Args:
201
    nodes: List of nodes (strings) or None for all
202

203
  """
204
  if not isinstance(nodes, list):
205
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
206

    
207
  if nodes:
208
    wanted = []
209

    
210
    for name in nodes:
211
      node = lu.cfg.ExpandNodeName(name)
212
      if node is None:
213
        raise errors.OpPrereqError("No such node name '%s'" % name)
214
      wanted.append(node)
215

    
216
  else:
217
    wanted = lu.cfg.GetNodeList()
218
  return utils.NiceSort(wanted)
219

    
220

    
221
def _GetWantedInstances(lu, instances):
222
  """Returns list of checked and expanded instance names.
223

224
  Args:
225
    instances: List of instances (strings) or None for all
226

227
  """
228
  if not isinstance(instances, list):
229
    raise errors.OpPrereqError("Invalid argument type 'instances'")
230

    
231
  if instances:
232
    wanted = []
233

    
234
    for name in instances:
235
      instance = lu.cfg.ExpandInstanceName(name)
236
      if instance is None:
237
        raise errors.OpPrereqError("No such instance name '%s'" % name)
238
      wanted.append(instance)
239

    
240
  else:
241
    wanted = lu.cfg.GetInstanceList()
242
  return utils.NiceSort(wanted)
243

    
244

    
245
def _CheckOutputFields(static, dynamic, selected):
246
  """Checks whether all selected fields are valid.
247

248
  Args:
249
    static: Static fields
250
    dynamic: Dynamic fields
251

252
  """
253
  static_fields = frozenset(static)
254
  dynamic_fields = frozenset(dynamic)
255

    
256
  all_fields = static_fields | dynamic_fields
257

    
258
  if not all_fields.issuperset(selected):
259
    raise errors.OpPrereqError("Unknown output fields selected: %s"
260
                               % ",".join(frozenset(selected).
261
                                          difference(all_fields)))
262

    
263

    
264
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
265
                          memory, vcpus, nics):
266
  """Builds instance related env variables for hooks from single variables.
267

268
  Args:
269
    secondary_nodes: List of secondary nodes as strings
270
  """
271
  env = {
272
    "OP_TARGET": name,
273
    "INSTANCE_NAME": name,
274
    "INSTANCE_PRIMARY": primary_node,
275
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
276
    "INSTANCE_OS_TYPE": os_type,
277
    "INSTANCE_STATUS": status,
278
    "INSTANCE_MEMORY": memory,
279
    "INSTANCE_VCPUS": vcpus,
280
  }
281

    
282
  if nics:
283
    nic_count = len(nics)
284
    for idx, (ip, bridge, mac) in enumerate(nics):
285
      if ip is None:
286
        ip = ""
287
      env["INSTANCE_NIC%d_IP" % idx] = ip
288
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
289
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
290
  else:
291
    nic_count = 0
292

    
293
  env["INSTANCE_NIC_COUNT"] = nic_count
294

    
295
  return env
296

    
297

    
298
def _BuildInstanceHookEnvByObject(instance, override=None):
299
  """Builds instance related env variables for hooks from an object.
300

301
  Args:
302
    instance: objects.Instance object of instance
303
    override: dict of values to override
304
  """
305
  args = {
306
    'name': instance.name,
307
    'primary_node': instance.primary_node,
308
    'secondary_nodes': instance.secondary_nodes,
309
    'os_type': instance.os,
310
    'status': instance.os,
311
    'memory': instance.memory,
312
    'vcpus': instance.vcpus,
313
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
314
  }
315
  if override:
316
    args.update(override)
317
  return _BuildInstanceHookEnv(**args)
318

    
319

    
320
def _HasValidVG(vglist, vgname):
321
  """Checks if the volume group list is valid.
322

323
  A non-None return value means there's an error, and the return value
324
  is the error message.
325

326
  """
327
  vgsize = vglist.get(vgname, None)
328
  if vgsize is None:
329
    return "volume group '%s' missing" % vgname
330
  elif vgsize < 20480:
331
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
332
            (vgname, vgsize))
333
  return None
334

    
335

    
336
def _InitSSHSetup(node):
337
  """Setup the SSH configuration for the cluster.
338

339

340
  This generates a dsa keypair for root, adds the pub key to the
341
  permitted hosts and adds the hostkey to its own known hosts.
342

343
  Args:
344
    node: the name of this host as a fqdn
345

346
  """
347
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348

    
349
  for name in priv_key, pub_key:
350
    if os.path.exists(name):
351
      utils.CreateBackup(name)
352
    utils.RemoveFile(name)
353

    
354
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
355
                         "-f", priv_key,
356
                         "-q", "-N", ""])
357
  if result.failed:
358
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
359
                             result.output)
360

    
361
  f = open(pub_key, 'r')
362
  try:
363
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
364
  finally:
365
    f.close()
366

    
367

    
368
def _InitGanetiServerSetup(ss):
369
  """Setup the necessary configuration for the initial node daemon.
370

371
  This creates the nodepass file containing the shared password for
372
  the cluster and also generates the SSL certificate.
373

374
  """
375
  # Create pseudo random password
376
  randpass = sha.new(os.urandom(64)).hexdigest()
377
  # and write it into sstore
378
  ss.SetKey(ss.SS_NODED_PASS, randpass)
379

    
380
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
381
                         "-days", str(365*5), "-nodes", "-x509",
382
                         "-keyout", constants.SSL_CERT_FILE,
383
                         "-out", constants.SSL_CERT_FILE, "-batch"])
384
  if result.failed:
385
    raise errors.OpExecError("could not generate server ssl cert, command"
386
                             " %s had exitcode %s and error message %s" %
387
                             (result.cmd, result.exit_code, result.output))
388

    
389
  os.chmod(constants.SSL_CERT_FILE, 0400)
390

    
391
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
392

    
393
  if result.failed:
394
    raise errors.OpExecError("Could not start the node daemon, command %s"
395
                             " had exitcode %s and error %s" %
396
                             (result.cmd, result.exit_code, result.output))
397

    
398

    
399
def _CheckInstanceBridgesExist(instance):
400
  """Check that the brigdes needed by an instance exist.
401

402
  """
403
  # check bridges existance
404
  brlist = [nic.bridge for nic in instance.nics]
405
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
406
    raise errors.OpPrereqError("one or more target bridges %s does not"
407
                               " exist on destination node '%s'" %
408
                               (brlist, instance.primary_node))
409

    
410

    
411
class LUInitCluster(LogicalUnit):
412
  """Initialise the cluster.
413

414
  """
415
  HPATH = "cluster-init"
416
  HTYPE = constants.HTYPE_CLUSTER
417
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
418
              "def_bridge", "master_netdev", "file_storage_dir"]
419
  REQ_CLUSTER = False
420

    
421
  def BuildHooksEnv(self):
422
    """Build hooks env.
423

424
    Notes: Since we don't require a cluster, we must manually add
425
    ourselves in the post-run node list.
426

427
    """
428
    env = {"OP_TARGET": self.op.cluster_name}
429
    return env, [], [self.hostname.name]
430

    
431
  def CheckPrereq(self):
432
    """Verify that the passed name is a valid one.
433

434
    """
435
    if config.ConfigWriter.IsCluster():
436
      raise errors.OpPrereqError("Cluster is already initialised")
437

    
438
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
439
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
440
        raise errors.OpPrereqError("Please prepare the cluster VNC"
441
                                   "password file %s" %
442
                                   constants.VNC_PASSWORD_FILE)
443

    
444
    self.hostname = hostname = utils.HostInfo()
445

    
446
    if hostname.ip.startswith("127."):
447
      raise errors.OpPrereqError("This host's IP resolves to the private"
448
                                 " range (%s). Please fix DNS or %s." %
449
                                 (hostname.ip, constants.ETC_HOSTS))
450

    
451
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
452
                         source=constants.LOCALHOST_IP_ADDRESS):
453
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
454
                                 " to %s,\nbut this ip address does not"
455
                                 " belong to this host."
456
                                 " Aborting." % hostname.ip)
457

    
458
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459

    
460
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461
                     timeout=5):
462
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463

    
464
    secondary_ip = getattr(self.op, "secondary_ip", None)
465
    if secondary_ip and not utils.IsValidIP(secondary_ip):
466
      raise errors.OpPrereqError("Invalid secondary ip given")
467
    if (secondary_ip and
468
        secondary_ip != hostname.ip and
469
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
470
                           source=constants.LOCALHOST_IP_ADDRESS))):
471
      raise errors.OpPrereqError("You gave %s as secondary IP,"
472
                                 " but it does not belong to this host." %
473
                                 secondary_ip)
474
    self.secondary_ip = secondary_ip
475

    
476
    if not hasattr(self.op, "vg_name"):
477
      self.op.vg_name = None
478
    # if vg_name not None, checks if volume group is valid
479
    if self.op.vg_name:
480
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
481
      if vgstatus:
482
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
483
                                   " you are not using lvm" % vgstatus)
484

    
485
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
486

    
487
    if not os.path.isabs(self.op.file_storage_dir):
488
      raise errors.OpPrereqError("The file storage directory you have is"
489
                                 " not an absolute path.")
490

    
491
    if not os.path.exists(self.op.file_storage_dir):
492
      try:
493
        os.makedirs(self.op.file_storage_dir, 0750)
494
      except OSError, err:
495
        raise errors.OpPrereqError("Cannot create file storage directory"
496
                                   " '%s': %s" %
497
                                   (self.op.file_storage_dir, err))
498

    
499
    if not os.path.isdir(self.op.file_storage_dir):
500
      raise errors.OpPrereqError("The file storage directory '%s' is not"
501
                                 " a directory." % self.op.file_storage_dir)
502

    
503
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504
                    self.op.mac_prefix):
505
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
506
                                 self.op.mac_prefix)
507

    
508
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
509
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
510
                                 self.op.hypervisor_type)
511

    
512
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513
    if result.failed:
514
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
515
                                 (self.op.master_netdev,
516
                                  result.output.strip()))
517

    
518
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
519
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
520
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
521
                                 " executable." % constants.NODE_INITD_SCRIPT)
522

    
523
  def Exec(self, feedback_fn):
524
    """Initialize the cluster.
525

526
    """
527
    clustername = self.clustername
528
    hostname = self.hostname
529

    
530
    # set up the simple store
531
    self.sstore = ss = ssconf.SimpleStore()
532
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
533
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
534
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
535
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
536
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
537
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
538

    
539
    # set up the inter-node password and certificate
540
    _InitGanetiServerSetup(ss)
541

    
542
    # start the master ip
543
    rpc.call_node_start_master(hostname.name)
544

    
545
    # set up ssh config and /etc/hosts
546
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
547
    try:
548
      sshline = f.read()
549
    finally:
550
      f.close()
551
    sshkey = sshline.split(" ")[1]
552

    
553
    _AddHostToEtcHosts(hostname.name)
554
    _InitSSHSetup(hostname.name)
555

    
556
    # init of cluster config file
557
    self.cfg = cfgw = config.ConfigWriter()
558
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
559
                    sshkey, self.op.mac_prefix,
560
                    self.op.vg_name, self.op.def_bridge)
561

    
562
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
563

    
564

    
565
class LUDestroyCluster(NoHooksLU):
566
  """Logical unit for destroying the cluster.
567

568
  """
569
  _OP_REQP = []
570

    
571
  def CheckPrereq(self):
572
    """Check prerequisites.
573

574
    This checks whether the cluster is empty.
575

576
    Any errors are signalled by raising errors.OpPrereqError.
577

578
    """
579
    master = self.sstore.GetMasterNode()
580

    
581
    nodelist = self.cfg.GetNodeList()
582
    if len(nodelist) != 1 or nodelist[0] != master:
583
      raise errors.OpPrereqError("There are still %d node(s) in"
584
                                 " this cluster." % (len(nodelist) - 1))
585
    instancelist = self.cfg.GetInstanceList()
586
    if instancelist:
587
      raise errors.OpPrereqError("There are still %d instance(s) in"
588
                                 " this cluster." % len(instancelist))
589

    
590
  def Exec(self, feedback_fn):
591
    """Destroys the cluster.
592

593
    """
594
    master = self.sstore.GetMasterNode()
595
    if not rpc.call_node_stop_master(master):
596
      raise errors.OpExecError("Could not disable the master role")
597
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598
    utils.CreateBackup(priv_key)
599
    utils.CreateBackup(pub_key)
600
    rpc.call_node_leave_cluster(master)
601

    
602

    
603
class LUVerifyCluster(NoHooksLU):
604
  """Verifies the cluster status.
605

606
  """
607
  _OP_REQP = ["skip_checks"]
608

    
609
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
610
                  remote_version, feedback_fn):
611
    """Run multiple tests against a node.
612

613
    Test list:
614
      - compares ganeti version
615
      - checks vg existance and size > 20G
616
      - checks config file checksum
617
      - checks ssh to other nodes
618

619
    Args:
620
      node: name of the node to check
621
      file_list: required list of files
622
      local_cksum: dictionary of local files and their checksums
623

624
    """
625
    # compares ganeti version
626
    local_version = constants.PROTOCOL_VERSION
627
    if not remote_version:
628
      feedback_fn("  - ERROR: connection to %s failed" % (node))
629
      return True
630

    
631
    if local_version != remote_version:
632
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
633
                      (local_version, node, remote_version))
634
      return True
635

    
636
    # checks vg existance and size > 20G
637

    
638
    bad = False
639
    if not vglist:
640
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
641
                      (node,))
642
      bad = True
643
    else:
644
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
645
      if vgstatus:
646
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
647
        bad = True
648

    
649
    # checks config file checksum
650
    # checks ssh to any
651

    
652
    if 'filelist' not in node_result:
653
      bad = True
654
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
655
    else:
656
      remote_cksum = node_result['filelist']
657
      for file_name in file_list:
658
        if file_name not in remote_cksum:
659
          bad = True
660
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
661
        elif remote_cksum[file_name] != local_cksum[file_name]:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
664

    
665
    if 'nodelist' not in node_result:
666
      bad = True
667
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
668
    else:
669
      if node_result['nodelist']:
670
        bad = True
671
        for node in node_result['nodelist']:
672
          feedback_fn("  - ERROR: communication with node '%s': %s" %
673
                          (node, node_result['nodelist'][node]))
674
    hyp_result = node_result.get('hypervisor', None)
675
    if hyp_result is not None:
676
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
677
    return bad
678

    
679
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
680
                      node_instance, feedback_fn):
681
    """Verify an instance.
682

683
    This function checks to see if the required block devices are
684
    available on the instance's node.
685

686
    """
687
    bad = False
688

    
689
    node_current = instanceconfig.primary_node
690

    
691
    node_vol_should = {}
692
    instanceconfig.MapLVsByNode(node_vol_should)
693

    
694
    for node in node_vol_should:
695
      for volume in node_vol_should[node]:
696
        if node not in node_vol_is or volume not in node_vol_is[node]:
697
          feedback_fn("  - ERROR: volume %s missing on node %s" %
698
                          (volume, node))
699
          bad = True
700

    
701
    if not instanceconfig.status == 'down':
702
      if (node_current not in node_instance or
703
          not instance in node_instance[node_current]):
704
        feedback_fn("  - ERROR: instance %s not running on node %s" %
705
                        (instance, node_current))
706
        bad = True
707

    
708
    for node in node_instance:
709
      if (not node == node_current):
710
        if instance in node_instance[node]:
711
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
712
                          (instance, node))
713
          bad = True
714

    
715
    return bad
716

    
717
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
718
    """Verify if there are any unknown volumes in the cluster.
719

720
    The .os, .swap and backup volumes are ignored. All other volumes are
721
    reported as unknown.
722

723
    """
724
    bad = False
725

    
726
    for node in node_vol_is:
727
      for volume in node_vol_is[node]:
728
        if node not in node_vol_should or volume not in node_vol_should[node]:
729
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
730
                      (volume, node))
731
          bad = True
732
    return bad
733

    
734
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
735
    """Verify the list of running instances.
736

737
    This checks what instances are running but unknown to the cluster.
738

739
    """
740
    bad = False
741
    for node in node_instance:
742
      for runninginstance in node_instance[node]:
743
        if runninginstance not in instancelist:
744
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
745
                          (runninginstance, node))
746
          bad = True
747
    return bad
748

    
749
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
750
    """Verify N+1 Memory Resilience.
751

752
    Check that if one single node dies we can still start all the instances it
753
    was primary for.
754

755
    """
756
    bad = False
757

    
758
    for node, nodeinfo in node_info.iteritems():
759
      # This code checks that every node which is now listed as secondary has
760
      # enough memory to host all instances it is supposed to should a single
761
      # other node in the cluster fail.
762
      # FIXME: not ready for failover to an arbitrary node
763
      # FIXME: does not support file-backed instances
764
      # WARNING: we currently take into account down instances as well as up
765
      # ones, considering that even if they're down someone might want to start
766
      # them even in the event of a node failure.
767
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
768
        needed_mem = 0
769
        for instance in instances:
770
          needed_mem += instance_cfg[instance].memory
771
        if nodeinfo['mfree'] < needed_mem:
772
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
773
                      " failovers should node %s fail" % (node, prinode))
774
          bad = True
775
    return bad
776

    
777
  def CheckPrereq(self):
778
    """Check prerequisites.
779

780
    Transform the list of checks we're going to skip into a set and check that
781
    all its members are valid.
782

783
    """
784
    self.skip_set = frozenset(self.op.skip_checks)
785
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
786
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
787

    
788
  def Exec(self, feedback_fn):
789
    """Verify integrity of cluster, performing various test on nodes.
790

791
    """
792
    bad = False
793
    feedback_fn("* Verifying global settings")
794
    for msg in self.cfg.VerifyConfig():
795
      feedback_fn("  - ERROR: %s" % msg)
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
799
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
800
    i_non_redundant = [] # Non redundant instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = list(self.sstore.GetFileList())
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = rpc.call_instance_list(nodelist)
816
    all_vglist = rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': None,
821
      }
822
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
823
    all_rversion = rpc.call_version(nodelist)
824
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
825

    
826
    for node in nodelist:
827
      feedback_fn("* Verifying node %s" % node)
828
      result = self._VerifyNode(node, file_names, local_checksums,
829
                                all_vglist[node], all_nvinfo[node],
830
                                all_rversion[node], feedback_fn)
831
      bad = bad or result
832

    
833
      # node_volume
834
      volumeinfo = all_volumeinfo[node]
835

    
836
      if isinstance(volumeinfo, basestring):
837
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
838
                    (node, volumeinfo[-400:].encode('string_escape')))
839
        bad = True
840
        node_volume[node] = {}
841
      elif not isinstance(volumeinfo, dict):
842
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
843
        bad = True
844
        continue
845
      else:
846
        node_volume[node] = volumeinfo
847

    
848
      # node_instance
849
      nodeinstance = all_instanceinfo[node]
850
      if type(nodeinstance) != list:
851
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
852
        bad = True
853
        continue
854

    
855
      node_instance[node] = nodeinstance
856

    
857
      # node_info
858
      nodeinfo = all_ninfo[node]
859
      if not isinstance(nodeinfo, dict):
860
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
861
        bad = True
862
        continue
863

    
864
      try:
865
        node_info[node] = {
866
          "mfree": int(nodeinfo['memory_free']),
867
          "dfree": int(nodeinfo['vg_free']),
868
          "pinst": [],
869
          "sinst": [],
870
          # dictionary holding all instances this node is secondary for,
871
          # grouped by their primary node. Each key is a cluster node, and each
872
          # value is a list of instances which have the key as primary and the
873
          # current node as secondary.  this is handy to calculate N+1 memory
874
          # availability if you can only failover from a primary to its
875
          # secondary.
876
          "sinst-by-pnode": {},
877
        }
878
      except ValueError:
879
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
880
        bad = True
881
        continue
882

    
883
    node_vol_should = {}
884

    
885
    for instance in instancelist:
886
      feedback_fn("* Verifying instance %s" % instance)
887
      inst_config = self.cfg.GetInstanceInfo(instance)
888
      result =  self._VerifyInstance(instance, inst_config, node_volume,
889
                                     node_instance, feedback_fn)
890
      bad = bad or result
891

    
892
      inst_config.MapLVsByNode(node_vol_should)
893

    
894
      instance_cfg[instance] = inst_config
895

    
896
      pnode = inst_config.primary_node
897
      if pnode in node_info:
898
        node_info[pnode]['pinst'].append(instance)
899
      else:
900
        feedback_fn("  - ERROR: instance %s, connection to primary node"
901
                    " %s failed" % (instance, pnode))
902
        bad = True
903

    
904
      # If the instance is non-redundant we cannot survive losing its primary
905
      # node, so we are not N+1 compliant. On the other hand we have no disk
906
      # templates with more than one secondary so that situation is not well
907
      # supported either.
908
      # FIXME: does not support file-backed instances
909
      if len(inst_config.secondary_nodes) == 0:
910
        i_non_redundant.append(instance)
911
      elif len(inst_config.secondary_nodes) > 1:
912
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
913
                    % instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    return int(bad)
946

    
947

    
948
class LUVerifyDisks(NoHooksLU):
949
  """Verifies the cluster disks status.
950

951
  """
952
  _OP_REQP = []
953

    
954
  def CheckPrereq(self):
955
    """Check prerequisites.
956

957
    This has no prerequisites.
958

959
    """
960
    pass
961

    
962
  def Exec(self, feedback_fn):
963
    """Verify integrity of cluster disks.
964

965
    """
966
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
967

    
968
    vg_name = self.cfg.GetVGName()
969
    nodes = utils.NiceSort(self.cfg.GetNodeList())
970
    instances = [self.cfg.GetInstanceInfo(name)
971
                 for name in self.cfg.GetInstanceList()]
972

    
973
    nv_dict = {}
974
    for inst in instances:
975
      inst_lvs = {}
976
      if (inst.status != "up" or
977
          inst.disk_template not in constants.DTS_NET_MIRROR):
978
        continue
979
      inst.MapLVsByNode(inst_lvs)
980
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
981
      for node, vol_list in inst_lvs.iteritems():
982
        for vol in vol_list:
983
          nv_dict[(node, vol)] = inst
984

    
985
    if not nv_dict:
986
      return result
987

    
988
    node_lvs = rpc.call_volume_list(nodes, vg_name)
989

    
990
    to_act = set()
991
    for node in nodes:
992
      # node_volume
993
      lvs = node_lvs[node]
994

    
995
      if isinstance(lvs, basestring):
996
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
997
        res_nlvm[node] = lvs
998
      elif not isinstance(lvs, dict):
999
        logger.Info("connection to node %s failed or invalid data returned" %
1000
                    (node,))
1001
        res_nodes.append(node)
1002
        continue
1003

    
1004
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1005
        inst = nv_dict.pop((node, lv_name), None)
1006
        if (not lv_online and inst is not None
1007
            and inst.name not in res_instances):
1008
          res_instances.append(inst.name)
1009

    
1010
    # any leftover items in nv_dict are missing LVs, let's arrange the
1011
    # data better
1012
    for key, inst in nv_dict.iteritems():
1013
      if inst.name not in res_missing:
1014
        res_missing[inst.name] = []
1015
      res_missing[inst.name].append(key)
1016

    
1017
    return result
1018

    
1019

    
1020
class LURenameCluster(LogicalUnit):
1021
  """Rename the cluster.
1022

1023
  """
1024
  HPATH = "cluster-rename"
1025
  HTYPE = constants.HTYPE_CLUSTER
1026
  _OP_REQP = ["name"]
1027

    
1028
  def BuildHooksEnv(self):
1029
    """Build hooks env.
1030

1031
    """
1032
    env = {
1033
      "OP_TARGET": self.sstore.GetClusterName(),
1034
      "NEW_NAME": self.op.name,
1035
      }
1036
    mn = self.sstore.GetMasterNode()
1037
    return env, [mn], [mn]
1038

    
1039
  def CheckPrereq(self):
1040
    """Verify that the passed name is a valid one.
1041

1042
    """
1043
    hostname = utils.HostInfo(self.op.name)
1044

    
1045
    new_name = hostname.name
1046
    self.ip = new_ip = hostname.ip
1047
    old_name = self.sstore.GetClusterName()
1048
    old_ip = self.sstore.GetMasterIP()
1049
    if new_name == old_name and new_ip == old_ip:
1050
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1051
                                 " cluster has changed")
1052
    if new_ip != old_ip:
1053
      result = utils.RunCmd(["fping", "-q", new_ip])
1054
      if not result.failed:
1055
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056
                                   " reachable on the network. Aborting." %
1057
                                   new_ip)
1058

    
1059
    self.op.name = new_name
1060

    
1061
  def Exec(self, feedback_fn):
1062
    """Rename the cluster.
1063

1064
    """
1065
    clustername = self.op.name
1066
    ip = self.ip
1067
    ss = self.sstore
1068

    
1069
    # shutdown the master IP
1070
    master = ss.GetMasterNode()
1071
    if not rpc.call_node_stop_master(master):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      ss.SetKey(ss.SS_MASTER_IP, ip)
1077
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078

    
1079
      # Distribute updated ss config to all nodes
1080
      myself = self.cfg.GetNodeInfo(master)
1081
      dist_nodes = self.cfg.GetNodeList()
1082
      if myself.name in dist_nodes:
1083
        dist_nodes.remove(myself.name)
1084

    
1085
      logger.Debug("Copying updated ssconf data to all nodes")
1086
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087
        fname = ss.KeyToFilename(keyname)
1088
        result = rpc.call_upload_file(dist_nodes, fname)
1089
        for to_node in dist_nodes:
1090
          if not result[to_node]:
1091
            logger.Error("copy of file %s to node %s failed" %
1092
                         (fname, to_node))
1093
    finally:
1094
      if not rpc.call_node_start_master(master):
1095
        logger.Error("Could not re-enable the master role on the master,"
1096
                     " please restart manually.")
1097

    
1098

    
1099
def _RecursiveCheckIfLVMBased(disk):
1100
  """Check if the given disk or its children are lvm-based.
1101

1102
  Args:
1103
    disk: ganeti.objects.Disk object
1104

1105
  Returns:
1106
    boolean indicating whether a LD_LV dev_type was found or not
1107

1108
  """
1109
  if disk.children:
1110
    for chdisk in disk.children:
1111
      if _RecursiveCheckIfLVMBased(chdisk):
1112
        return True
1113
  return disk.dev_type == constants.LD_LV
1114

    
1115

    
1116
class LUSetClusterParams(LogicalUnit):
1117
  """Change the parameters of the cluster.
1118

1119
  """
1120
  HPATH = "cluster-modify"
1121
  HTYPE = constants.HTYPE_CLUSTER
1122
  _OP_REQP = []
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    """
1128
    env = {
1129
      "OP_TARGET": self.sstore.GetClusterName(),
1130
      "NEW_VG_NAME": self.op.vg_name,
1131
      }
1132
    mn = self.sstore.GetMasterNode()
1133
    return env, [mn], [mn]
1134

    
1135
  def CheckPrereq(self):
1136
    """Check prerequisites.
1137

1138
    This checks whether the given params don't conflict and
1139
    if the given volume group is valid.
1140

1141
    """
1142
    if not self.op.vg_name:
1143
      instances = [self.cfg.GetInstanceInfo(name)
1144
                   for name in self.cfg.GetInstanceList()]
1145
      for inst in instances:
1146
        for disk in inst.disks:
1147
          if _RecursiveCheckIfLVMBased(disk):
1148
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1149
                                       " lvm-based instances exist")
1150

    
1151
    # if vg_name not None, checks given volume group on all nodes
1152
    if self.op.vg_name:
1153
      node_list = self.cfg.GetNodeList()
1154
      vglist = rpc.call_vg_list(node_list)
1155
      for node in node_list:
1156
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1157
        if vgstatus:
1158
          raise errors.OpPrereqError("Error on node '%s': %s" %
1159
                                     (node, vgstatus))
1160

    
1161
  def Exec(self, feedback_fn):
1162
    """Change the parameters of the cluster.
1163

1164
    """
1165
    if self.op.vg_name != self.cfg.GetVGName():
1166
      self.cfg.SetVGName(self.op.vg_name)
1167
    else:
1168
      feedback_fn("Cluster LVM configuration already in desired"
1169
                  " state, not changing")
1170

    
1171

    
1172
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1173
  """Sleep and poll for an instance's disk to sync.
1174

1175
  """
1176
  if not instance.disks:
1177
    return True
1178

    
1179
  if not oneshot:
1180
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181

    
1182
  node = instance.primary_node
1183

    
1184
  for dev in instance.disks:
1185
    cfgw.SetDiskID(dev, node)
1186

    
1187
  retries = 0
1188
  while True:
1189
    max_time = 0
1190
    done = True
1191
    cumul_degraded = False
1192
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193
    if not rstats:
1194
      proc.LogWarning("Can't get any data from node %s" % node)
1195
      retries += 1
1196
      if retries >= 10:
1197
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1198
                                 " aborting." % node)
1199
      time.sleep(6)
1200
      continue
1201
    retries = 0
1202
    for i in range(len(rstats)):
1203
      mstat = rstats[i]
1204
      if mstat is None:
1205
        proc.LogWarning("Can't compute data for node %s/%s" %
1206
                        (node, instance.disks[i].iv_name))
1207
        continue
1208
      # we ignore the ldisk parameter
1209
      perc_done, est_time, is_degraded, _ = mstat
1210
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211
      if perc_done is not None:
1212
        done = False
1213
        if est_time is not None:
1214
          rem_time = "%d estimated seconds remaining" % est_time
1215
          max_time = est_time
1216
        else:
1217
          rem_time = "no time estimate"
1218
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219
                     (instance.disks[i].iv_name, perc_done, rem_time))
1220
    if done or oneshot:
1221
      break
1222

    
1223
    if unlock:
1224
      utils.Unlock('cmd')
1225
    try:
1226
      time.sleep(min(60, max_time))
1227
    finally:
1228
      if unlock:
1229
        utils.Lock('cmd')
1230

    
1231
  if done:
1232
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1233
  return not cumul_degraded
1234

    
1235

    
1236
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1237
  """Check that mirrors are not degraded.
1238

1239
  The ldisk parameter, if True, will change the test from the
1240
  is_degraded attribute (which represents overall non-ok status for
1241
  the device(s)) to the ldisk (representing the local storage status).
1242

1243
  """
1244
  cfgw.SetDiskID(dev, node)
1245
  if ldisk:
1246
    idx = 6
1247
  else:
1248
    idx = 5
1249

    
1250
  result = True
1251
  if on_primary or dev.AssembleOnSecondary():
1252
    rstats = rpc.call_blockdev_find(node, dev)
1253
    if not rstats:
1254
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1255
      result = False
1256
    else:
1257
      result = result and (not rstats[idx])
1258
  if dev.children:
1259
    for child in dev.children:
1260
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1261

    
1262
  return result
1263

    
1264

    
1265
class LUDiagnoseOS(NoHooksLU):
1266
  """Logical unit for OS diagnose/query.
1267

1268
  """
1269
  _OP_REQP = ["output_fields", "names"]
1270

    
1271
  def CheckPrereq(self):
1272
    """Check prerequisites.
1273

1274
    This always succeeds, since this is a pure query LU.
1275

1276
    """
1277
    if self.op.names:
1278
      raise errors.OpPrereqError("Selective OS query not supported")
1279

    
1280
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1281
    _CheckOutputFields(static=[],
1282
                       dynamic=self.dynamic_fields,
1283
                       selected=self.op.output_fields)
1284

    
1285
  @staticmethod
1286
  def _DiagnoseByOS(node_list, rlist):
1287
    """Remaps a per-node return list into an a per-os per-node dictionary
1288

1289
      Args:
1290
        node_list: a list with the names of all nodes
1291
        rlist: a map with node names as keys and OS objects as values
1292

1293
      Returns:
1294
        map: a map with osnames as keys and as value another map, with
1295
             nodes as
1296
             keys and list of OS objects as values
1297
             e.g. {"debian-etch": {"node1": [<object>,...],
1298
                                   "node2": [<object>,]}
1299
                  }
1300

1301
    """
1302
    all_os = {}
1303
    for node_name, nr in rlist.iteritems():
1304
      if not nr:
1305
        continue
1306
      for os in nr:
1307
        if os.name not in all_os:
1308
          # build a list of nodes for this os containing empty lists
1309
          # for each node in node_list
1310
          all_os[os.name] = {}
1311
          for nname in node_list:
1312
            all_os[os.name][nname] = []
1313
        all_os[os.name][node_name].append(os)
1314
    return all_os
1315

    
1316
  def Exec(self, feedback_fn):
1317
    """Compute the list of OSes.
1318

1319
    """
1320
    node_list = self.cfg.GetNodeList()
1321
    node_data = rpc.call_os_diagnose(node_list)
1322
    if node_data == False:
1323
      raise errors.OpExecError("Can't gather the list of OSes")
1324
    pol = self._DiagnoseByOS(node_list, node_data)
1325
    output = []
1326
    for os_name, os_data in pol.iteritems():
1327
      row = []
1328
      for field in self.op.output_fields:
1329
        if field == "name":
1330
          val = os_name
1331
        elif field == "valid":
1332
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1333
        elif field == "node_status":
1334
          val = {}
1335
          for node_name, nos_list in os_data.iteritems():
1336
            val[node_name] = [(v.status, v.path) for v in nos_list]
1337
        else:
1338
          raise errors.ParameterError(field)
1339
        row.append(val)
1340
      output.append(row)
1341

    
1342
    return output
1343

    
1344

    
1345
class LURemoveNode(LogicalUnit):
1346
  """Logical unit for removing a node.
1347

1348
  """
1349
  HPATH = "node-remove"
1350
  HTYPE = constants.HTYPE_NODE
1351
  _OP_REQP = ["node_name"]
1352

    
1353
  def BuildHooksEnv(self):
1354
    """Build hooks env.
1355

1356
    This doesn't run on the target node in the pre phase as a failed
1357
    node would not allows itself to run.
1358

1359
    """
1360
    env = {
1361
      "OP_TARGET": self.op.node_name,
1362
      "NODE_NAME": self.op.node_name,
1363
      }
1364
    all_nodes = self.cfg.GetNodeList()
1365
    all_nodes.remove(self.op.node_name)
1366
    return env, all_nodes, all_nodes
1367

    
1368
  def CheckPrereq(self):
1369
    """Check prerequisites.
1370

1371
    This checks:
1372
     - the node exists in the configuration
1373
     - it does not have primary or secondary instances
1374
     - it's not the master
1375

1376
    Any errors are signalled by raising errors.OpPrereqError.
1377

1378
    """
1379
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1380
    if node is None:
1381
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1382

    
1383
    instance_list = self.cfg.GetInstanceList()
1384

    
1385
    masternode = self.sstore.GetMasterNode()
1386
    if node.name == masternode:
1387
      raise errors.OpPrereqError("Node is the master node,"
1388
                                 " you need to failover first.")
1389

    
1390
    for instance_name in instance_list:
1391
      instance = self.cfg.GetInstanceInfo(instance_name)
1392
      if node.name == instance.primary_node:
1393
        raise errors.OpPrereqError("Instance %s still running on the node,"
1394
                                   " please remove first." % instance_name)
1395
      if node.name in instance.secondary_nodes:
1396
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1397
                                   " please remove first." % instance_name)
1398
    self.op.node_name = node.name
1399
    self.node = node
1400

    
1401
  def Exec(self, feedback_fn):
1402
    """Removes the node from the cluster.
1403

1404
    """
1405
    node = self.node
1406
    logger.Info("stopping the node daemon and removing configs from node %s" %
1407
                node.name)
1408

    
1409
    rpc.call_node_leave_cluster(node.name)
1410

    
1411
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1412

    
1413
    logger.Info("Removing node %s from config" % node.name)
1414

    
1415
    self.cfg.RemoveNode(node.name)
1416

    
1417
    _RemoveHostFromEtcHosts(node.name)
1418

    
1419

    
1420
class LUQueryNodes(NoHooksLU):
1421
  """Logical unit for querying nodes.
1422

1423
  """
1424
  _OP_REQP = ["output_fields", "names"]
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks that the fields required are valid output fields.
1430

1431
    """
1432
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1433
                                     "mtotal", "mnode", "mfree",
1434
                                     "bootid"])
1435

    
1436
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1437
                               "pinst_list", "sinst_list",
1438
                               "pip", "sip"],
1439
                       dynamic=self.dynamic_fields,
1440
                       selected=self.op.output_fields)
1441

    
1442
    self.wanted = _GetWantedNodes(self, self.op.names)
1443

    
1444
  def Exec(self, feedback_fn):
1445
    """Computes the list of nodes and their attributes.
1446

1447
    """
1448
    nodenames = self.wanted
1449
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1450

    
1451
    # begin data gathering
1452

    
1453
    if self.dynamic_fields.intersection(self.op.output_fields):
1454
      live_data = {}
1455
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1456
      for name in nodenames:
1457
        nodeinfo = node_data.get(name, None)
1458
        if nodeinfo:
1459
          live_data[name] = {
1460
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1461
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1462
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1463
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1464
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1465
            "bootid": nodeinfo['bootid'],
1466
            }
1467
        else:
1468
          live_data[name] = {}
1469
    else:
1470
      live_data = dict.fromkeys(nodenames, {})
1471

    
1472
    node_to_primary = dict([(name, set()) for name in nodenames])
1473
    node_to_secondary = dict([(name, set()) for name in nodenames])
1474

    
1475
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1476
                             "sinst_cnt", "sinst_list"))
1477
    if inst_fields & frozenset(self.op.output_fields):
1478
      instancelist = self.cfg.GetInstanceList()
1479

    
1480
      for instance_name in instancelist:
1481
        inst = self.cfg.GetInstanceInfo(instance_name)
1482
        if inst.primary_node in node_to_primary:
1483
          node_to_primary[inst.primary_node].add(inst.name)
1484
        for secnode in inst.secondary_nodes:
1485
          if secnode in node_to_secondary:
1486
            node_to_secondary[secnode].add(inst.name)
1487

    
1488
    # end data gathering
1489

    
1490
    output = []
1491
    for node in nodelist:
1492
      node_output = []
1493
      for field in self.op.output_fields:
1494
        if field == "name":
1495
          val = node.name
1496
        elif field == "pinst_list":
1497
          val = list(node_to_primary[node.name])
1498
        elif field == "sinst_list":
1499
          val = list(node_to_secondary[node.name])
1500
        elif field == "pinst_cnt":
1501
          val = len(node_to_primary[node.name])
1502
        elif field == "sinst_cnt":
1503
          val = len(node_to_secondary[node.name])
1504
        elif field == "pip":
1505
          val = node.primary_ip
1506
        elif field == "sip":
1507
          val = node.secondary_ip
1508
        elif field in self.dynamic_fields:
1509
          val = live_data[node.name].get(field, None)
1510
        else:
1511
          raise errors.ParameterError(field)
1512
        node_output.append(val)
1513
      output.append(node_output)
1514

    
1515
    return output
1516

    
1517

    
1518
class LUQueryNodeVolumes(NoHooksLU):
1519
  """Logical unit for getting volumes on node(s).
1520

1521
  """
1522
  _OP_REQP = ["nodes", "output_fields"]
1523

    
1524
  def CheckPrereq(self):
1525
    """Check prerequisites.
1526

1527
    This checks that the fields required are valid output fields.
1528

1529
    """
1530
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1531

    
1532
    _CheckOutputFields(static=["node"],
1533
                       dynamic=["phys", "vg", "name", "size", "instance"],
1534
                       selected=self.op.output_fields)
1535

    
1536

    
1537
  def Exec(self, feedback_fn):
1538
    """Computes the list of nodes and their attributes.
1539

1540
    """
1541
    nodenames = self.nodes
1542
    volumes = rpc.call_node_volumes(nodenames)
1543

    
1544
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1545
             in self.cfg.GetInstanceList()]
1546

    
1547
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1548

    
1549
    output = []
1550
    for node in nodenames:
1551
      if node not in volumes or not volumes[node]:
1552
        continue
1553

    
1554
      node_vols = volumes[node][:]
1555
      node_vols.sort(key=lambda vol: vol['dev'])
1556

    
1557
      for vol in node_vols:
1558
        node_output = []
1559
        for field in self.op.output_fields:
1560
          if field == "node":
1561
            val = node
1562
          elif field == "phys":
1563
            val = vol['dev']
1564
          elif field == "vg":
1565
            val = vol['vg']
1566
          elif field == "name":
1567
            val = vol['name']
1568
          elif field == "size":
1569
            val = int(float(vol['size']))
1570
          elif field == "instance":
1571
            for inst in ilist:
1572
              if node not in lv_by_node[inst]:
1573
                continue
1574
              if vol['name'] in lv_by_node[inst][node]:
1575
                val = inst.name
1576
                break
1577
            else:
1578
              val = '-'
1579
          else:
1580
            raise errors.ParameterError(field)
1581
          node_output.append(str(val))
1582

    
1583
        output.append(node_output)
1584

    
1585
    return output
1586

    
1587

    
1588
class LUAddNode(LogicalUnit):
1589
  """Logical unit for adding node to the cluster.
1590

1591
  """
1592
  HPATH = "node-add"
1593
  HTYPE = constants.HTYPE_NODE
1594
  _OP_REQP = ["node_name"]
1595

    
1596
  def BuildHooksEnv(self):
1597
    """Build hooks env.
1598

1599
    This will run on all nodes before, and on all nodes + the new node after.
1600

1601
    """
1602
    env = {
1603
      "OP_TARGET": self.op.node_name,
1604
      "NODE_NAME": self.op.node_name,
1605
      "NODE_PIP": self.op.primary_ip,
1606
      "NODE_SIP": self.op.secondary_ip,
1607
      }
1608
    nodes_0 = self.cfg.GetNodeList()
1609
    nodes_1 = nodes_0 + [self.op.node_name, ]
1610
    return env, nodes_0, nodes_1
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    This checks:
1616
     - the new node is not already in the config
1617
     - it is resolvable
1618
     - its parameters (single/dual homed) matches the cluster
1619

1620
    Any errors are signalled by raising errors.OpPrereqError.
1621

1622
    """
1623
    node_name = self.op.node_name
1624
    cfg = self.cfg
1625

    
1626
    dns_data = utils.HostInfo(node_name)
1627

    
1628
    node = dns_data.name
1629
    primary_ip = self.op.primary_ip = dns_data.ip
1630
    secondary_ip = getattr(self.op, "secondary_ip", None)
1631
    if secondary_ip is None:
1632
      secondary_ip = primary_ip
1633
    if not utils.IsValidIP(secondary_ip):
1634
      raise errors.OpPrereqError("Invalid secondary IP given")
1635
    self.op.secondary_ip = secondary_ip
1636

    
1637
    node_list = cfg.GetNodeList()
1638
    if not self.op.readd and node in node_list:
1639
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1640
                                 node)
1641
    elif self.op.readd and node not in node_list:
1642
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1643

    
1644
    for existing_node_name in node_list:
1645
      existing_node = cfg.GetNodeInfo(existing_node_name)
1646

    
1647
      if self.op.readd and node == existing_node_name:
1648
        if (existing_node.primary_ip != primary_ip or
1649
            existing_node.secondary_ip != secondary_ip):
1650
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1651
                                     " address configuration as before")
1652
        continue
1653

    
1654
      if (existing_node.primary_ip == primary_ip or
1655
          existing_node.secondary_ip == primary_ip or
1656
          existing_node.primary_ip == secondary_ip or
1657
          existing_node.secondary_ip == secondary_ip):
1658
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1659
                                   " existing node %s" % existing_node.name)
1660

    
1661
    # check that the type of the node (single versus dual homed) is the
1662
    # same as for the master
1663
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1664
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1665
    newbie_singlehomed = secondary_ip == primary_ip
1666
    if master_singlehomed != newbie_singlehomed:
1667
      if master_singlehomed:
1668
        raise errors.OpPrereqError("The master has no private ip but the"
1669
                                   " new node has one")
1670
      else:
1671
        raise errors.OpPrereqError("The master has a private ip but the"
1672
                                   " new node doesn't have one")
1673

    
1674
    # checks reachablity
1675
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1676
      raise errors.OpPrereqError("Node not reachable by ping")
1677

    
1678
    if not newbie_singlehomed:
1679
      # check reachability from my secondary ip to newbie's secondary ip
1680
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1681
                           source=myself.secondary_ip):
1682
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1683
                                   " based ping to noded port")
1684

    
1685
    self.new_node = objects.Node(name=node,
1686
                                 primary_ip=primary_ip,
1687
                                 secondary_ip=secondary_ip)
1688

    
1689
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1690
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1691
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1692
                                   constants.VNC_PASSWORD_FILE)
1693

    
1694
  def Exec(self, feedback_fn):
1695
    """Adds the new node to the cluster.
1696

1697
    """
1698
    new_node = self.new_node
1699
    node = new_node.name
1700

    
1701
    # set up inter-node password and certificate and restarts the node daemon
1702
    gntpass = self.sstore.GetNodeDaemonPassword()
1703
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1704
      raise errors.OpExecError("ganeti password corruption detected")
1705
    f = open(constants.SSL_CERT_FILE)
1706
    try:
1707
      gntpem = f.read(8192)
1708
    finally:
1709
      f.close()
1710
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1711
    # so we use this to detect an invalid certificate; as long as the
1712
    # cert doesn't contain this, the here-document will be correctly
1713
    # parsed by the shell sequence below
1714
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1715
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1716
    if not gntpem.endswith("\n"):
1717
      raise errors.OpExecError("PEM must end with newline")
1718
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1719

    
1720
    # and then connect with ssh to set password and start ganeti-noded
1721
    # note that all the below variables are sanitized at this point,
1722
    # either by being constants or by the checks above
1723
    ss = self.sstore
1724
    mycommand = ("umask 077 && "
1725
                 "echo '%s' > '%s' && "
1726
                 "cat > '%s' << '!EOF.' && \n"
1727
                 "%s!EOF.\n%s restart" %
1728
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1729
                  constants.SSL_CERT_FILE, gntpem,
1730
                  constants.NODE_INITD_SCRIPT))
1731

    
1732
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1733
    if result.failed:
1734
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1735
                               " output: %s" %
1736
                               (node, result.fail_reason, result.output))
1737

    
1738
    # check connectivity
1739
    time.sleep(4)
1740

    
1741
    result = rpc.call_version([node])[node]
1742
    if result:
1743
      if constants.PROTOCOL_VERSION == result:
1744
        logger.Info("communication to node %s fine, sw version %s match" %
1745
                    (node, result))
1746
      else:
1747
        raise errors.OpExecError("Version mismatch master version %s,"
1748
                                 " node version %s" %
1749
                                 (constants.PROTOCOL_VERSION, result))
1750
    else:
1751
      raise errors.OpExecError("Cannot get version from the new node")
1752

    
1753
    # setup ssh on node
1754
    logger.Info("copy ssh key to node %s" % node)
1755
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1756
    keyarray = []
1757
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1758
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1759
                priv_key, pub_key]
1760

    
1761
    for i in keyfiles:
1762
      f = open(i, 'r')
1763
      try:
1764
        keyarray.append(f.read())
1765
      finally:
1766
        f.close()
1767

    
1768
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1769
                               keyarray[3], keyarray[4], keyarray[5])
1770

    
1771
    if not result:
1772
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1773

    
1774
    # Add node to our /etc/hosts, and add key to known_hosts
1775
    _AddHostToEtcHosts(new_node.name)
1776

    
1777
    if new_node.secondary_ip != new_node.primary_ip:
1778
      if not rpc.call_node_tcp_ping(new_node.name,
1779
                                    constants.LOCALHOST_IP_ADDRESS,
1780
                                    new_node.secondary_ip,
1781
                                    constants.DEFAULT_NODED_PORT,
1782
                                    10, False):
1783
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1784
                                 " you gave (%s). Please fix and re-run this"
1785
                                 " command." % new_node.secondary_ip)
1786

    
1787
    success, msg = self.ssh.VerifyNodeHostname(node)
1788
    if not success:
1789
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1790
                               " than the one the resolver gives: %s."
1791
                               " Please fix and re-run this command." %
1792
                               (node, msg))
1793

    
1794
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1795
    # including the node just added
1796
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1797
    dist_nodes = self.cfg.GetNodeList() + [node]
1798
    if myself.name in dist_nodes:
1799
      dist_nodes.remove(myself.name)
1800

    
1801
    logger.Debug("Copying hosts and known_hosts to all nodes")
1802
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1803
      result = rpc.call_upload_file(dist_nodes, fname)
1804
      for to_node in dist_nodes:
1805
        if not result[to_node]:
1806
          logger.Error("copy of file %s to node %s failed" %
1807
                       (fname, to_node))
1808

    
1809
    to_copy = ss.GetFileList()
1810
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1811
      to_copy.append(constants.VNC_PASSWORD_FILE)
1812
    for fname in to_copy:
1813
      if not self.ssh.CopyFileToNode(node, fname):
1814
        logger.Error("could not copy file %s to node %s" % (fname, node))
1815

    
1816
    if not self.op.readd:
1817
      logger.Info("adding node %s to cluster.conf" % node)
1818
      self.cfg.AddNode(new_node)
1819

    
1820

    
1821
class LUMasterFailover(LogicalUnit):
1822
  """Failover the master node to the current node.
1823

1824
  This is a special LU in that it must run on a non-master node.
1825

1826
  """
1827
  HPATH = "master-failover"
1828
  HTYPE = constants.HTYPE_CLUSTER
1829
  REQ_MASTER = False
1830
  _OP_REQP = []
1831

    
1832
  def BuildHooksEnv(self):
1833
    """Build hooks env.
1834

1835
    This will run on the new master only in the pre phase, and on all
1836
    the nodes in the post phase.
1837

1838
    """
1839
    env = {
1840
      "OP_TARGET": self.new_master,
1841
      "NEW_MASTER": self.new_master,
1842
      "OLD_MASTER": self.old_master,
1843
      }
1844
    return env, [self.new_master], self.cfg.GetNodeList()
1845

    
1846
  def CheckPrereq(self):
1847
    """Check prerequisites.
1848

1849
    This checks that we are not already the master.
1850

1851
    """
1852
    self.new_master = utils.HostInfo().name
1853
    self.old_master = self.sstore.GetMasterNode()
1854

    
1855
    if self.old_master == self.new_master:
1856
      raise errors.OpPrereqError("This commands must be run on the node"
1857
                                 " where you want the new master to be."
1858
                                 " %s is already the master" %
1859
                                 self.old_master)
1860

    
1861
  def Exec(self, feedback_fn):
1862
    """Failover the master node.
1863

1864
    This command, when run on a non-master node, will cause the current
1865
    master to cease being master, and the non-master to become new
1866
    master.
1867

1868
    """
1869
    #TODO: do not rely on gethostname returning the FQDN
1870
    logger.Info("setting master to %s, old master: %s" %
1871
                (self.new_master, self.old_master))
1872

    
1873
    if not rpc.call_node_stop_master(self.old_master):
1874
      logger.Error("could disable the master role on the old master"
1875
                   " %s, please disable manually" % self.old_master)
1876

    
1877
    ss = self.sstore
1878
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1879
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1880
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1881
      logger.Error("could not distribute the new simple store master file"
1882
                   " to the other nodes, please check.")
1883

    
1884
    if not rpc.call_node_start_master(self.new_master):
1885
      logger.Error("could not start the master role on the new master"
1886
                   " %s, please check" % self.new_master)
1887
      feedback_fn("Error in activating the master IP on the new master,"
1888
                  " please fix manually.")
1889

    
1890

    
1891

    
1892
class LUQueryClusterInfo(NoHooksLU):
1893
  """Query cluster configuration.
1894

1895
  """
1896
  _OP_REQP = []
1897
  REQ_MASTER = False
1898

    
1899
  def CheckPrereq(self):
1900
    """No prerequsites needed for this LU.
1901

1902
    """
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Return cluster config.
1907

1908
    """
1909
    result = {
1910
      "name": self.sstore.GetClusterName(),
1911
      "software_version": constants.RELEASE_VERSION,
1912
      "protocol_version": constants.PROTOCOL_VERSION,
1913
      "config_version": constants.CONFIG_VERSION,
1914
      "os_api_version": constants.OS_API_VERSION,
1915
      "export_version": constants.EXPORT_VERSION,
1916
      "master": self.sstore.GetMasterNode(),
1917
      "architecture": (platform.architecture()[0], platform.machine()),
1918
      }
1919

    
1920
    return result
1921

    
1922

    
1923
class LUClusterCopyFile(NoHooksLU):
1924
  """Copy file to cluster.
1925

1926
  """
1927
  _OP_REQP = ["nodes", "filename"]
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    It should check that the named file exists and that the given list
1933
    of nodes is valid.
1934

1935
    """
1936
    if not os.path.exists(self.op.filename):
1937
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1938

    
1939
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1940

    
1941
  def Exec(self, feedback_fn):
1942
    """Copy a file from master to some nodes.
1943

1944
    Args:
1945
      opts - class with options as members
1946
      args - list containing a single element, the file name
1947
    Opts used:
1948
      nodes - list containing the name of target nodes; if empty, all nodes
1949

1950
    """
1951
    filename = self.op.filename
1952

    
1953
    myname = utils.HostInfo().name
1954

    
1955
    for node in self.nodes:
1956
      if node == myname:
1957
        continue
1958
      if not self.ssh.CopyFileToNode(node, filename):
1959
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1960

    
1961

    
1962
class LUDumpClusterConfig(NoHooksLU):
1963
  """Return a text-representation of the cluster-config.
1964

1965
  """
1966
  _OP_REQP = []
1967

    
1968
  def CheckPrereq(self):
1969
    """No prerequisites.
1970

1971
    """
1972
    pass
1973

    
1974
  def Exec(self, feedback_fn):
1975
    """Dump a representation of the cluster config to the standard output.
1976

1977
    """
1978
    return self.cfg.DumpConfig()
1979

    
1980

    
1981
class LURunClusterCommand(NoHooksLU):
1982
  """Run a command on some nodes.
1983

1984
  """
1985
  _OP_REQP = ["command", "nodes"]
1986

    
1987
  def CheckPrereq(self):
1988
    """Check prerequisites.
1989

1990
    It checks that the given list of nodes is valid.
1991

1992
    """
1993
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Run a command on some nodes.
1997

1998
    """
1999
    # put the master at the end of the nodes list
2000
    master_node = self.sstore.GetMasterNode()
2001
    if master_node in self.nodes:
2002
      self.nodes.remove(master_node)
2003
      self.nodes.append(master_node)
2004

    
2005
    data = []
2006
    for node in self.nodes:
2007
      result = self.ssh.Run(node, "root", self.op.command)
2008
      data.append((node, result.output, result.exit_code))
2009

    
2010
    return data
2011

    
2012

    
2013
class LUActivateInstanceDisks(NoHooksLU):
2014
  """Bring up an instance's disks.
2015

2016
  """
2017
  _OP_REQP = ["instance_name"]
2018

    
2019
  def CheckPrereq(self):
2020
    """Check prerequisites.
2021

2022
    This checks that the instance is in the cluster.
2023

2024
    """
2025
    instance = self.cfg.GetInstanceInfo(
2026
      self.cfg.ExpandInstanceName(self.op.instance_name))
2027
    if instance is None:
2028
      raise errors.OpPrereqError("Instance '%s' not known" %
2029
                                 self.op.instance_name)
2030
    self.instance = instance
2031

    
2032

    
2033
  def Exec(self, feedback_fn):
2034
    """Activate the disks.
2035

2036
    """
2037
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2038
    if not disks_ok:
2039
      raise errors.OpExecError("Cannot activate block devices")
2040

    
2041
    return disks_info
2042

    
2043

    
2044
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2045
  """Prepare the block devices for an instance.
2046

2047
  This sets up the block devices on all nodes.
2048

2049
  Args:
2050
    instance: a ganeti.objects.Instance object
2051
    ignore_secondaries: if true, errors on secondary nodes won't result
2052
                        in an error return from the function
2053

2054
  Returns:
2055
    false if the operation failed
2056
    list of (host, instance_visible_name, node_visible_name) if the operation
2057
         suceeded with the mapping from node devices to instance devices
2058
  """
2059
  device_info = []
2060
  disks_ok = True
2061
  iname = instance.name
2062
  # With the two passes mechanism we try to reduce the window of
2063
  # opportunity for the race condition of switching DRBD to primary
2064
  # before handshaking occured, but we do not eliminate it
2065

    
2066
  # The proper fix would be to wait (with some limits) until the
2067
  # connection has been made and drbd transitions from WFConnection
2068
  # into any other network-connected state (Connected, SyncTarget,
2069
  # SyncSource, etc.)
2070

    
2071
  # 1st pass, assemble on all nodes in secondary mode
2072
  for inst_disk in instance.disks:
2073
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2074
      cfg.SetDiskID(node_disk, node)
2075
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2076
      if not result:
2077
        logger.Error("could not prepare block device %s on node %s"
2078
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2079
        if not ignore_secondaries:
2080
          disks_ok = False
2081

    
2082
  # FIXME: race condition on drbd migration to primary
2083

    
2084
  # 2nd pass, do only the primary node
2085
  for inst_disk in instance.disks:
2086
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2087
      if node != instance.primary_node:
2088
        continue
2089
      cfg.SetDiskID(node_disk, node)
2090
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2091
      if not result:
2092
        logger.Error("could not prepare block device %s on node %s"
2093
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2094
        disks_ok = False
2095
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2096

    
2097
  # leave the disks configured for the primary node
2098
  # this is a workaround that would be fixed better by
2099
  # improving the logical/physical id handling
2100
  for disk in instance.disks:
2101
    cfg.SetDiskID(disk, instance.primary_node)
2102

    
2103
  return disks_ok, device_info
2104

    
2105

    
2106
def _StartInstanceDisks(cfg, instance, force):
2107
  """Start the disks of an instance.
2108

2109
  """
2110
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2111
                                           ignore_secondaries=force)
2112
  if not disks_ok:
2113
    _ShutdownInstanceDisks(instance, cfg)
2114
    if force is not None and not force:
2115
      logger.Error("If the message above refers to a secondary node,"
2116
                   " you can retry the operation using '--force'.")
2117
    raise errors.OpExecError("Disk consistency error")
2118

    
2119

    
2120
class LUDeactivateInstanceDisks(NoHooksLU):
2121
  """Shutdown an instance's disks.
2122

2123
  """
2124
  _OP_REQP = ["instance_name"]
2125

    
2126
  def CheckPrereq(self):
2127
    """Check prerequisites.
2128

2129
    This checks that the instance is in the cluster.
2130

2131
    """
2132
    instance = self.cfg.GetInstanceInfo(
2133
      self.cfg.ExpandInstanceName(self.op.instance_name))
2134
    if instance is None:
2135
      raise errors.OpPrereqError("Instance '%s' not known" %
2136
                                 self.op.instance_name)
2137
    self.instance = instance
2138

    
2139
  def Exec(self, feedback_fn):
2140
    """Deactivate the disks
2141

2142
    """
2143
    instance = self.instance
2144
    ins_l = rpc.call_instance_list([instance.primary_node])
2145
    ins_l = ins_l[instance.primary_node]
2146
    if not type(ins_l) is list:
2147
      raise errors.OpExecError("Can't contact node '%s'" %
2148
                               instance.primary_node)
2149

    
2150
    if self.instance.name in ins_l:
2151
      raise errors.OpExecError("Instance is running, can't shutdown"
2152
                               " block devices.")
2153

    
2154
    _ShutdownInstanceDisks(instance, self.cfg)
2155

    
2156

    
2157
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2158
  """Shutdown block devices of an instance.
2159

2160
  This does the shutdown on all nodes of the instance.
2161

2162
  If the ignore_primary is false, errors on the primary node are
2163
  ignored.
2164

2165
  """
2166
  result = True
2167
  for disk in instance.disks:
2168
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2169
      cfg.SetDiskID(top_disk, node)
2170
      if not rpc.call_blockdev_shutdown(node, top_disk):
2171
        logger.Error("could not shutdown block device %s on node %s" %
2172
                     (disk.iv_name, node))
2173
        if not ignore_primary or node != instance.primary_node:
2174
          result = False
2175
  return result
2176

    
2177

    
2178
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2179
  """Checks if a node has enough free memory.
2180

2181
  This function check if a given node has the needed amount of free
2182
  memory. In case the node has less memory or we cannot get the
2183
  information from the node, this function raise an OpPrereqError
2184
  exception.
2185

2186
  Args:
2187
    - cfg: a ConfigWriter instance
2188
    - node: the node name
2189
    - reason: string to use in the error message
2190
    - requested: the amount of memory in MiB
2191

2192
  """
2193
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2194
  if not nodeinfo or not isinstance(nodeinfo, dict):
2195
    raise errors.OpPrereqError("Could not contact node %s for resource"
2196
                             " information" % (node,))
2197

    
2198
  free_mem = nodeinfo[node].get('memory_free')
2199
  if not isinstance(free_mem, int):
2200
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2201
                             " was '%s'" % (node, free_mem))
2202
  if requested > free_mem:
2203
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2204
                             " needed %s MiB, available %s MiB" %
2205
                             (node, reason, requested, free_mem))
2206

    
2207

    
2208
class LUStartupInstance(LogicalUnit):
2209
  """Starts an instance.
2210

2211
  """
2212
  HPATH = "instance-start"
2213
  HTYPE = constants.HTYPE_INSTANCE
2214
  _OP_REQP = ["instance_name", "force"]
2215

    
2216
  def BuildHooksEnv(self):
2217
    """Build hooks env.
2218

2219
    This runs on master, primary and secondary nodes of the instance.
2220

2221
    """
2222
    env = {
2223
      "FORCE": self.op.force,
2224
      }
2225
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2226
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227
          list(self.instance.secondary_nodes))
2228
    return env, nl, nl
2229

    
2230
  def CheckPrereq(self):
2231
    """Check prerequisites.
2232

2233
    This checks that the instance is in the cluster.
2234

2235
    """
2236
    instance = self.cfg.GetInstanceInfo(
2237
      self.cfg.ExpandInstanceName(self.op.instance_name))
2238
    if instance is None:
2239
      raise errors.OpPrereqError("Instance '%s' not known" %
2240
                                 self.op.instance_name)
2241

    
2242
    # check bridges existance
2243
    _CheckInstanceBridgesExist(instance)
2244

    
2245
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2246
                         "starting instance %s" % instance.name,
2247
                         instance.memory)
2248

    
2249
    self.instance = instance
2250
    self.op.instance_name = instance.name
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Start the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    force = self.op.force
2258
    extra_args = getattr(self.op, "extra_args", "")
2259

    
2260
    self.cfg.MarkInstanceUp(instance.name)
2261

    
2262
    node_current = instance.primary_node
2263

    
2264
    _StartInstanceDisks(self.cfg, instance, force)
2265

    
2266
    if not rpc.call_instance_start(node_current, instance, extra_args):
2267
      _ShutdownInstanceDisks(instance, self.cfg)
2268
      raise errors.OpExecError("Could not start instance")
2269

    
2270

    
2271
class LURebootInstance(LogicalUnit):
2272
  """Reboot an instance.
2273

2274
  """
2275
  HPATH = "instance-reboot"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = {
2286
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2287
      }
2288
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304

    
2305
    # check bridges existance
2306
    _CheckInstanceBridgesExist(instance)
2307

    
2308
    self.instance = instance
2309
    self.op.instance_name = instance.name
2310

    
2311
  def Exec(self, feedback_fn):
2312
    """Reboot the instance.
2313

2314
    """
2315
    instance = self.instance
2316
    ignore_secondaries = self.op.ignore_secondaries
2317
    reboot_type = self.op.reboot_type
2318
    extra_args = getattr(self.op, "extra_args", "")
2319

    
2320
    node_current = instance.primary_node
2321

    
2322
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2323
                           constants.INSTANCE_REBOOT_HARD,
2324
                           constants.INSTANCE_REBOOT_FULL]:
2325
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2326
                                  (constants.INSTANCE_REBOOT_SOFT,
2327
                                   constants.INSTANCE_REBOOT_HARD,
2328
                                   constants.INSTANCE_REBOOT_FULL))
2329

    
2330
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2331
                       constants.INSTANCE_REBOOT_HARD]:
2332
      if not rpc.call_instance_reboot(node_current, instance,
2333
                                      reboot_type, extra_args):
2334
        raise errors.OpExecError("Could not reboot instance")
2335
    else:
2336
      if not rpc.call_instance_shutdown(node_current, instance):
2337
        raise errors.OpExecError("could not shutdown instance for full reboot")
2338
      _ShutdownInstanceDisks(instance, self.cfg)
2339
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2340
      if not rpc.call_instance_start(node_current, instance, extra_args):
2341
        _ShutdownInstanceDisks(instance, self.cfg)
2342
        raise errors.OpExecError("Could not start instance for full reboot")
2343

    
2344
    self.cfg.MarkInstanceUp(instance.name)
2345

    
2346

    
2347
class LUShutdownInstance(LogicalUnit):
2348
  """Shutdown an instance.
2349

2350
  """
2351
  HPATH = "instance-stop"
2352
  HTYPE = constants.HTYPE_INSTANCE
2353
  _OP_REQP = ["instance_name"]
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on master, primary and secondary nodes of the instance.
2359

2360
    """
2361
    env = _BuildInstanceHookEnvByObject(self.instance)
2362
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2363
          list(self.instance.secondary_nodes))
2364
    return env, nl, nl
2365

    
2366
  def CheckPrereq(self):
2367
    """Check prerequisites.
2368

2369
    This checks that the instance is in the cluster.
2370

2371
    """
2372
    instance = self.cfg.GetInstanceInfo(
2373
      self.cfg.ExpandInstanceName(self.op.instance_name))
2374
    if instance is None:
2375
      raise errors.OpPrereqError("Instance '%s' not known" %
2376
                                 self.op.instance_name)
2377
    self.instance = instance
2378

    
2379
  def Exec(self, feedback_fn):
2380
    """Shutdown the instance.
2381

2382
    """
2383
    instance = self.instance
2384
    node_current = instance.primary_node
2385
    self.cfg.MarkInstanceDown(instance.name)
2386
    if not rpc.call_instance_shutdown(node_current, instance):
2387
      logger.Error("could not shutdown instance")
2388

    
2389
    _ShutdownInstanceDisks(instance, self.cfg)
2390

    
2391

    
2392
class LUReinstallInstance(LogicalUnit):
2393
  """Reinstall an instance.
2394

2395
  """
2396
  HPATH = "instance-reinstall"
2397
  HTYPE = constants.HTYPE_INSTANCE
2398
  _OP_REQP = ["instance_name"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = _BuildInstanceHookEnvByObject(self.instance)
2407
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2408
          list(self.instance.secondary_nodes))
2409
    return env, nl, nl
2410

    
2411
  def CheckPrereq(self):
2412
    """Check prerequisites.
2413

2414
    This checks that the instance is in the cluster and is not running.
2415

2416
    """
2417
    instance = self.cfg.GetInstanceInfo(
2418
      self.cfg.ExpandInstanceName(self.op.instance_name))
2419
    if instance is None:
2420
      raise errors.OpPrereqError("Instance '%s' not known" %
2421
                                 self.op.instance_name)
2422
    if instance.disk_template == constants.DT_DISKLESS:
2423
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2424
                                 self.op.instance_name)
2425
    if instance.status != "down":
2426
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2427
                                 self.op.instance_name)
2428
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2429
    if remote_info:
2430
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2431
                                 (self.op.instance_name,
2432
                                  instance.primary_node))
2433

    
2434
    self.op.os_type = getattr(self.op, "os_type", None)
2435
    if self.op.os_type is not None:
2436
      # OS verification
2437
      pnode = self.cfg.GetNodeInfo(
2438
        self.cfg.ExpandNodeName(instance.primary_node))
2439
      if pnode is None:
2440
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2441
                                   self.op.pnode)
2442
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2443
      if not os_obj:
2444
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2445
                                   " primary node"  % self.op.os_type)
2446

    
2447
    self.instance = instance
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Reinstall the instance.
2451

2452
    """
2453
    inst = self.instance
2454

    
2455
    if self.op.os_type is not None:
2456
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2457
      inst.os = self.op.os_type
2458
      self.cfg.AddInstance(inst)
2459

    
2460
    _StartInstanceDisks(self.cfg, inst, None)
2461
    try:
2462
      feedback_fn("Running the instance OS create scripts...")
2463
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2464
        raise errors.OpExecError("Could not install OS for instance %s"
2465
                                 " on node %s" %
2466
                                 (inst.name, inst.primary_node))
2467
    finally:
2468
      _ShutdownInstanceDisks(inst, self.cfg)
2469

    
2470

    
2471
class LURenameInstance(LogicalUnit):
2472
  """Rename an instance.
2473

2474
  """
2475
  HPATH = "instance-rename"
2476
  HTYPE = constants.HTYPE_INSTANCE
2477
  _OP_REQP = ["instance_name", "new_name"]
2478

    
2479
  def BuildHooksEnv(self):
2480
    """Build hooks env.
2481

2482
    This runs on master, primary and secondary nodes of the instance.
2483

2484
    """
2485
    env = _BuildInstanceHookEnvByObject(self.instance)
2486
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2487
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2488
          list(self.instance.secondary_nodes))
2489
    return env, nl, nl
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster and is not running.
2495

2496
    """
2497
    instance = self.cfg.GetInstanceInfo(
2498
      self.cfg.ExpandInstanceName(self.op.instance_name))
2499
    if instance is None:
2500
      raise errors.OpPrereqError("Instance '%s' not known" %
2501
                                 self.op.instance_name)
2502
    if instance.status != "down":
2503
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2504
                                 self.op.instance_name)
2505
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2506
    if remote_info:
2507
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2508
                                 (self.op.instance_name,
2509
                                  instance.primary_node))
2510
    self.instance = instance
2511

    
2512
    # new name verification
2513
    name_info = utils.HostInfo(self.op.new_name)
2514

    
2515
    self.op.new_name = new_name = name_info.name
2516
    instance_list = self.cfg.GetInstanceList()
2517
    if new_name in instance_list:
2518
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2519
                                 new_name)
2520

    
2521
    if not getattr(self.op, "ignore_ip", False):
2522
      command = ["fping", "-q", name_info.ip]
2523
      result = utils.RunCmd(command)
2524
      if not result.failed:
2525
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2526
                                   (name_info.ip, new_name))
2527

    
2528

    
2529
  def Exec(self, feedback_fn):
2530
    """Reinstall the instance.
2531

2532
    """
2533
    inst = self.instance
2534
    old_name = inst.name
2535

    
2536
    if inst.disk_template == constants.DT_FILE:
2537
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2538

    
2539
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2540

    
2541
    # re-read the instance from the configuration after rename
2542
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2543

    
2544
    if inst.disk_template == constants.DT_FILE:
2545
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2546
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2547
                                                old_file_storage_dir,
2548
                                                new_file_storage_dir)
2549

    
2550
      if not result:
2551
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2552
                                 " directory '%s' to '%s' (but the instance"
2553
                                 " has been renamed in Ganeti)" % (
2554
                                 inst.primary_node, old_file_storage_dir,
2555
                                 new_file_storage_dir))
2556

    
2557
      if not result[0]:
2558
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2559
                                 " (but the instance has been renamed in"
2560
                                 " Ganeti)" % (old_file_storage_dir,
2561
                                               new_file_storage_dir))
2562

    
2563
    _StartInstanceDisks(self.cfg, inst, None)
2564
    try:
2565
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2566
                                          "sda", "sdb"):
2567
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2568
               " instance has been renamed in Ganeti)" %
2569
               (inst.name, inst.primary_node))
2570
        logger.Error(msg)
2571
    finally:
2572
      _ShutdownInstanceDisks(inst, self.cfg)
2573

    
2574

    
2575
class LURemoveInstance(LogicalUnit):
2576
  """Remove an instance.
2577

2578
  """
2579
  HPATH = "instance-remove"
2580
  HTYPE = constants.HTYPE_INSTANCE
2581
  _OP_REQP = ["instance_name"]
2582

    
2583
  def BuildHooksEnv(self):
2584
    """Build hooks env.
2585

2586
    This runs on master, primary and secondary nodes of the instance.
2587

2588
    """
2589
    env = _BuildInstanceHookEnvByObject(self.instance)
2590
    nl = [self.sstore.GetMasterNode()]
2591
    return env, nl, nl
2592

    
2593
  def CheckPrereq(self):
2594
    """Check prerequisites.
2595

2596
    This checks that the instance is in the cluster.
2597

2598
    """
2599
    instance = self.cfg.GetInstanceInfo(
2600
      self.cfg.ExpandInstanceName(self.op.instance_name))
2601
    if instance is None:
2602
      raise errors.OpPrereqError("Instance '%s' not known" %
2603
                                 self.op.instance_name)
2604
    self.instance = instance
2605

    
2606
  def Exec(self, feedback_fn):
2607
    """Remove the instance.
2608

2609
    """
2610
    instance = self.instance
2611
    logger.Info("shutting down instance %s on node %s" %
2612
                (instance.name, instance.primary_node))
2613

    
2614
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2615
      if self.op.ignore_failures:
2616
        feedback_fn("Warning: can't shutdown instance")
2617
      else:
2618
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2619
                                 (instance.name, instance.primary_node))
2620

    
2621
    logger.Info("removing block devices for instance %s" % instance.name)
2622

    
2623
    if not _RemoveDisks(instance, self.cfg):
2624
      if self.op.ignore_failures:
2625
        feedback_fn("Warning: can't remove instance's disks")
2626
      else:
2627
        raise errors.OpExecError("Can't remove instance's disks")
2628

    
2629
    logger.Info("removing instance %s out of cluster config" % instance.name)
2630

    
2631
    self.cfg.RemoveInstance(instance.name)
2632

    
2633

    
2634
class LUQueryInstances(NoHooksLU):
2635
  """Logical unit for querying instances.
2636

2637
  """
2638
  _OP_REQP = ["output_fields", "names"]
2639

    
2640
  def CheckPrereq(self):
2641
    """Check prerequisites.
2642

2643
    This checks that the fields required are valid output fields.
2644

2645
    """
2646
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2647
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2648
                               "admin_state", "admin_ram",
2649
                               "disk_template", "ip", "mac", "bridge",
2650
                               "sda_size", "sdb_size", "vcpus"],
2651
                       dynamic=self.dynamic_fields,
2652
                       selected=self.op.output_fields)
2653

    
2654
    self.wanted = _GetWantedInstances(self, self.op.names)
2655

    
2656
  def Exec(self, feedback_fn):
2657
    """Computes the list of nodes and their attributes.
2658

2659
    """
2660
    instance_names = self.wanted
2661
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2662
                     in instance_names]
2663

    
2664
    # begin data gathering
2665

    
2666
    nodes = frozenset([inst.primary_node for inst in instance_list])
2667

    
2668
    bad_nodes = []
2669
    if self.dynamic_fields.intersection(self.op.output_fields):
2670
      live_data = {}
2671
      node_data = rpc.call_all_instances_info(nodes)
2672
      for name in nodes:
2673
        result = node_data[name]
2674
        if result:
2675
          live_data.update(result)
2676
        elif result == False:
2677
          bad_nodes.append(name)
2678
        # else no instance is alive
2679
    else:
2680
      live_data = dict([(name, {}) for name in instance_names])
2681

    
2682
    # end data gathering
2683

    
2684
    output = []
2685
    for instance in instance_list:
2686
      iout = []
2687
      for field in self.op.output_fields:
2688
        if field == "name":
2689
          val = instance.name
2690
        elif field == "os":
2691
          val = instance.os
2692
        elif field == "pnode":
2693
          val = instance.primary_node
2694
        elif field == "snodes":
2695
          val = list(instance.secondary_nodes)
2696
        elif field == "admin_state":
2697
          val = (instance.status != "down")
2698
        elif field == "oper_state":
2699
          if instance.primary_node in bad_nodes:
2700
            val = None
2701
          else:
2702
            val = bool(live_data.get(instance.name))
2703
        elif field == "status":
2704
          if instance.primary_node in bad_nodes:
2705
            val = "ERROR_nodedown"
2706
          else:
2707
            running = bool(live_data.get(instance.name))
2708
            if running:
2709
              if instance.status != "down":
2710
                val = "running"
2711
              else:
2712
                val = "ERROR_up"
2713
            else:
2714
              if instance.status != "down":
2715
                val = "ERROR_down"
2716
              else:
2717
                val = "ADMIN_down"
2718
        elif field == "admin_ram":
2719
          val = instance.memory
2720
        elif field == "oper_ram":
2721
          if instance.primary_node in bad_nodes:
2722
            val = None
2723
          elif instance.name in live_data:
2724
            val = live_data[instance.name].get("memory", "?")
2725
          else:
2726
            val = "-"
2727
        elif field == "disk_template":
2728
          val = instance.disk_template
2729
        elif field == "ip":
2730
          val = instance.nics[0].ip
2731
        elif field == "bridge":
2732
          val = instance.nics[0].bridge
2733
        elif field == "mac":
2734
          val = instance.nics[0].mac
2735
        elif field == "sda_size" or field == "sdb_size":
2736
          disk = instance.FindDisk(field[:3])
2737
          if disk is None:
2738
            val = None
2739
          else:
2740
            val = disk.size
2741
        elif field == "vcpus":
2742
          val = instance.vcpus
2743
        else:
2744
          raise errors.ParameterError(field)
2745
        iout.append(val)
2746
      output.append(iout)
2747

    
2748
    return output
2749

    
2750

    
2751
class LUFailoverInstance(LogicalUnit):
2752
  """Failover an instance.
2753

2754
  """
2755
  HPATH = "instance-failover"
2756
  HTYPE = constants.HTYPE_INSTANCE
2757
  _OP_REQP = ["instance_name", "ignore_consistency"]
2758

    
2759
  def BuildHooksEnv(self):
2760
    """Build hooks env.
2761

2762
    This runs on master, primary and secondary nodes of the instance.
2763

2764
    """
2765
    env = {
2766
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2767
      }
2768
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2769
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2770
    return env, nl, nl
2771

    
2772
  def CheckPrereq(self):
2773
    """Check prerequisites.
2774

2775
    This checks that the instance is in the cluster.
2776

2777
    """
2778
    instance = self.cfg.GetInstanceInfo(
2779
      self.cfg.ExpandInstanceName(self.op.instance_name))
2780
    if instance is None:
2781
      raise errors.OpPrereqError("Instance '%s' not known" %
2782
                                 self.op.instance_name)
2783

    
2784
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2785
      raise errors.OpPrereqError("Instance's disk layout is not"
2786
                                 " network mirrored, cannot failover.")
2787

    
2788
    secondary_nodes = instance.secondary_nodes
2789
    if not secondary_nodes:
2790
      raise errors.ProgrammerError("no secondary node but using "
2791
                                   "DT_REMOTE_RAID1 template")
2792

    
2793
    target_node = secondary_nodes[0]
2794
    # check memory requirements on the secondary node
2795
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2796
                         instance.name, instance.memory)
2797

    
2798
    # check bridge existance
2799
    brlist = [nic.bridge for nic in instance.nics]
2800
    if not rpc.call_bridges_exist(target_node, brlist):
2801
      raise errors.OpPrereqError("One or more target bridges %s does not"
2802
                                 " exist on destination node '%s'" %
2803
                                 (brlist, target_node))
2804

    
2805
    self.instance = instance
2806

    
2807
  def Exec(self, feedback_fn):
2808
    """Failover an instance.
2809

2810
    The failover is done by shutting it down on its present node and
2811
    starting it on the secondary.
2812

2813
    """
2814
    instance = self.instance
2815

    
2816
    source_node = instance.primary_node
2817
    target_node = instance.secondary_nodes[0]
2818

    
2819
    feedback_fn("* checking disk consistency between source and target")
2820
    for dev in instance.disks:
2821
      # for remote_raid1, these are md over drbd
2822
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2823
        if instance.status == "up" and not self.op.ignore_consistency:
2824
          raise errors.OpExecError("Disk %s is degraded on target node,"
2825
                                   " aborting failover." % dev.iv_name)
2826

    
2827
    feedback_fn("* shutting down instance on source node")
2828
    logger.Info("Shutting down instance %s on node %s" %
2829
                (instance.name, source_node))
2830

    
2831
    if not rpc.call_instance_shutdown(source_node, instance):
2832
      if self.op.ignore_consistency:
2833
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2834
                     " anyway. Please make sure node %s is down"  %
2835
                     (instance.name, source_node, source_node))
2836
      else:
2837
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2838
                                 (instance.name, source_node))
2839

    
2840
    feedback_fn("* deactivating the instance's disks on source node")
2841
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2842
      raise errors.OpExecError("Can't shut down the instance's disks.")
2843

    
2844
    instance.primary_node = target_node
2845
    # distribute new instance config to the other nodes
2846
    self.cfg.AddInstance(instance)
2847

    
2848
    # Only start the instance if it's marked as up
2849
    if instance.status == "up":
2850
      feedback_fn("* activating the instance's disks on target node")
2851
      logger.Info("Starting instance %s on node %s" %
2852
                  (instance.name, target_node))
2853

    
2854
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2855
                                               ignore_secondaries=True)
2856
      if not disks_ok:
2857
        _ShutdownInstanceDisks(instance, self.cfg)
2858
        raise errors.OpExecError("Can't activate the instance's disks")
2859

    
2860
      feedback_fn("* starting the instance on the target node")
2861
      if not rpc.call_instance_start(target_node, instance, None):
2862
        _ShutdownInstanceDisks(instance, self.cfg)
2863
        raise errors.OpExecError("Could not start instance %s on node %s." %
2864
                                 (instance.name, target_node))
2865

    
2866

    
2867
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2868
  """Create a tree of block devices on the primary node.
2869

2870
  This always creates all devices.
2871

2872
  """
2873
  if device.children:
2874
    for child in device.children:
2875
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2876
        return False
2877

    
2878
  cfg.SetDiskID(device, node)
2879
  new_id = rpc.call_blockdev_create(node, device, device.size,
2880
                                    instance.name, True, info)
2881
  if not new_id:
2882
    return False
2883
  if device.physical_id is None:
2884
    device.physical_id = new_id
2885
  return True
2886

    
2887

    
2888
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2889
  """Create a tree of block devices on a secondary node.
2890

2891
  If this device type has to be created on secondaries, create it and
2892
  all its children.
2893

2894
  If not, just recurse to children keeping the same 'force' value.
2895

2896
  """
2897
  if device.CreateOnSecondary():
2898
    force = True
2899
  if device.children:
2900
    for child in device.children:
2901
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2902
                                        child, force, info):
2903
        return False
2904

    
2905
  if not force:
2906
    return True
2907
  cfg.SetDiskID(device, node)
2908
  new_id = rpc.call_blockdev_create(node, device, device.size,
2909
                                    instance.name, False, info)
2910
  if not new_id:
2911
    return False
2912
  if device.physical_id is None:
2913
    device.physical_id = new_id
2914
  return True
2915

    
2916

    
2917
def _GenerateUniqueNames(cfg, exts):
2918
  """Generate a suitable LV name.
2919

2920
  This will generate a logical volume name for the given instance.
2921

2922
  """
2923
  results = []
2924
  for val in exts:
2925
    new_id = cfg.GenerateUniqueID()
2926
    results.append("%s%s" % (new_id, val))
2927
  return results
2928

    
2929

    
2930
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2931
  """Generate a drbd device complete with its children.
2932

2933
  """
2934
  port = cfg.AllocatePort()
2935
  vgname = cfg.GetVGName()
2936
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2937
                          logical_id=(vgname, names[0]))
2938
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2939
                          logical_id=(vgname, names[1]))
2940
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2941
                          logical_id = (primary, secondary, port),
2942
                          children = [dev_data, dev_meta])
2943
  return drbd_dev
2944

    
2945

    
2946
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2947
  """Generate a drbd8 device complete with its children.
2948

2949
  """
2950
  port = cfg.AllocatePort()
2951
  vgname = cfg.GetVGName()
2952
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2953
                          logical_id=(vgname, names[0]))
2954
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2955
                          logical_id=(vgname, names[1]))
2956
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2957
                          logical_id = (primary, secondary, port),
2958
                          children = [dev_data, dev_meta],
2959
                          iv_name=iv_name)
2960
  return drbd_dev
2961

    
2962

    
2963
def _GenerateDiskTemplate(cfg, template_name,
2964
                          instance_name, primary_node,
2965
                          secondary_nodes, disk_sz, swap_sz,
2966
                          file_storage_dir, file_driver):
2967
  """Generate the entire disk layout for a given template type.
2968

2969
  """
2970
  #TODO: compute space requirements
2971

    
2972
  vgname = cfg.GetVGName()
2973
  if template_name == constants.DT_DISKLESS:
2974
    disks = []
2975
  elif template_name == constants.DT_PLAIN:
2976
    if len(secondary_nodes) != 0:
2977
      raise errors.ProgrammerError("Wrong template configuration")
2978

    
2979
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2980
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2981
                           logical_id=(vgname, names[0]),
2982
                           iv_name = "sda")
2983
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2984
                           logical_id=(vgname, names[1]),
2985
                           iv_name = "sdb")
2986
    disks = [sda_dev, sdb_dev]
2987
  elif template_name == constants.DT_DRBD8:
2988
    if len(secondary_nodes) != 1:
2989
      raise errors.ProgrammerError("Wrong template configuration")
2990
    remote_node = secondary_nodes[0]
2991
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2992
                                       ".sdb_data", ".sdb_meta"])
2993
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2994
                                         disk_sz, names[0:2], "sda")
2995
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2996
                                         swap_sz, names[2:4], "sdb")
2997
    disks = [drbd_sda_dev, drbd_sdb_dev]
2998
  elif template_name == constants.DT_FILE:
2999
    if len(secondary_nodes) != 0:
3000
      raise errors.ProgrammerError("Wrong template configuration")
3001

    
3002
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3003
                                iv_name="sda", logical_id=(file_driver,
3004
                                "%s/sda" % file_storage_dir))
3005
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3006
                                iv_name="sdb", logical_id=(file_driver,
3007
                                "%s/sdb" % file_storage_dir))
3008
    disks = [file_sda_dev, file_sdb_dev]
3009
  else:
3010
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3011
  return disks
3012

    
3013

    
3014
def _GetInstanceInfoText(instance):
3015
  """Compute that text that should be added to the disk's metadata.
3016

3017
  """
3018
  return "originstname+%s" % instance.name
3019

    
3020

    
3021
def _CreateDisks(cfg, instance):
3022
  """Create all disks for an instance.
3023

3024
  This abstracts away some work from AddInstance.
3025

3026
  Args:
3027
    instance: the instance object
3028

3029
  Returns:
3030
    True or False showing the success of the creation process
3031

3032
  """
3033
  info = _GetInstanceInfoText(instance)
3034

    
3035
  if instance.disk_template == constants.DT_FILE:
3036
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3037
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3038
                                              file_storage_dir)
3039

    
3040
    if not result:
3041
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3042
      return False
3043

    
3044
    if not result[0]:
3045
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3046
      return False
3047

    
3048
  for device in instance.disks:
3049
    logger.Info("creating volume %s for instance %s" %
3050
                (device.iv_name, instance.name))
3051
    #HARDCODE
3052
    for secondary_node in instance.secondary_nodes:
3053
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3054
                                        device, False, info):
3055
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3056
                     (device.iv_name, device, secondary_node))
3057
        return False
3058
    #HARDCODE
3059
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3060
                                    instance, device, info):
3061
      logger.Error("failed to create volume %s on primary!" %
3062
                   device.iv_name)
3063
      return False
3064

    
3065
  return True
3066

    
3067

    
3068
def _RemoveDisks(instance, cfg):
3069
  """Remove all disks for an instance.
3070

3071
  This abstracts away some work from `AddInstance()` and
3072
  `RemoveInstance()`. Note that in case some of the devices couldn't
3073
  be removed, the removal will continue with the other ones (compare
3074
  with `_CreateDisks()`).
3075

3076
  Args:
3077
    instance: the instance object
3078

3079
  Returns:
3080
    True or False showing the success of the removal proces
3081

3082
  """
3083
  logger.Info("removing block devices for instance %s" % instance.name)
3084

    
3085
  result = True
3086
  for device in instance.disks:
3087
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3088
      cfg.SetDiskID(disk, node)
3089
      if not rpc.call_blockdev_remove(node, disk):
3090
        logger.Error("could not remove block device %s on node %s,"
3091
                     " continuing anyway" %
3092
                     (device.iv_name, node))
3093
        result = False
3094

    
3095
  if instance.disk_template == constants.DT_FILE:
3096
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3097
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3098
                                            file_storage_dir):
3099
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3100
      result = False
3101

    
3102
  return result
3103

    
3104

    
3105
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3106
  """Compute disk size requirements in the volume group
3107

3108
  This is currently hard-coded for the two-drive layout.
3109

3110
  """
3111
  # Required free disk space as a function of disk and swap space
3112
  req_size_dict = {
3113
    constants.DT_DISKLESS: None,
3114
    constants.DT_PLAIN: disk_size + swap_size,
3115
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3116
    constants.DT_DRBD8: disk_size + swap_size + 256,
3117
    constants.DT_FILE: None,
3118
  }
3119

    
3120
  if disk_template not in req_size_dict:
3121
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3122
                                 " is unknown" %  disk_template)
3123

    
3124
  return req_size_dict[disk_template]
3125

    
3126

    
3127
class LUCreateInstance(LogicalUnit):
3128
  """Create an instance.
3129

3130
  """
3131
  HPATH = "instance-add"
3132
  HTYPE = constants.HTYPE_INSTANCE
3133
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3134
              "disk_template", "swap_size", "mode", "start", "vcpus",
3135
              "wait_for_sync", "ip_check", "mac"]
3136

    
3137
  def _RunAllocator(self):
3138
    """Run the allocator based on input opcode.
3139

3140
    """
3141
    disks = [{"size": self.op.disk_size, "mode": "w"},
3142
             {"size": self.op.swap_size, "mode": "w"}]
3143
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3144
             "bridge": self.op.bridge}]
3145
    ial = IAllocator(self.cfg, self.sstore,
3146
                     name=self.op.instance_name,
3147
                     disk_template=self.op.disk_template,
3148
                     tags=[],
3149
                     os=self.op.os_type,
3150
                     vcpus=self.op.vcpus,
3151
                     mem_size=self.op.mem_size,
3152
                     disks=disks,
3153
                     nics=nics,
3154
                     mode=constants.IALLOCATOR_MODE_ALLOC)
3155

    
3156
    ial.Run(self.op.iallocator)
3157

    
3158
    if not ial.success:
3159
      raise errors.OpPrereqError("Can't compute nodes using"
3160
                                 " iallocator '%s': %s" % (self.op.iallocator,
3161
                                                           ial.info))
3162
    if len(ial.nodes) != ial.required_nodes:
3163
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3164
                                 " of nodes (%s), required %s" %
3165
                                 (len(ial.nodes), ial.required_nodes))
3166
    self.op.pnode = ial.nodes[0]
3167
    logger.ToStdout("Selected nodes for the instance: %s" %
3168
                    (", ".join(ial.nodes),))
3169
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3170
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3171
    if ial.required_nodes == 2:
3172
      self.op.snode = ial.nodes[1]
3173

    
3174
  def BuildHooksEnv(self):
3175
    """Build hooks env.
3176

3177
    This runs on master, primary and secondary nodes of the instance.
3178

3179
    """
3180
    env = {
3181
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3182
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3183
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3184
      "INSTANCE_ADD_MODE": self.op.mode,
3185
      }
3186
    if self.op.mode == constants.INSTANCE_IMPORT:
3187
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3188
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3189
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3190

    
3191
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3192
      primary_node=self.op.pnode,
3193
      secondary_nodes=self.secondaries,
3194
      status=self.instance_status,
3195
      os_type=self.op.os_type,
3196
      memory=self.op.mem_size,
3197
      vcpus=self.op.vcpus,
3198
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3199
    ))
3200

    
3201
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3202
          self.secondaries)
3203
    return env, nl, nl
3204

    
3205

    
3206
  def CheckPrereq(self):
3207
    """Check prerequisites.
3208

3209
    """
3210
    # set optional parameters to none if they don't exist
3211
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3212
                 "iallocator"]:
3213
      if not hasattr(self.op, attr):
3214
        setattr(self.op, attr, None)
3215

    
3216
    if self.op.mode not in (constants.INSTANCE_CREATE,
3217
                            constants.INSTANCE_IMPORT):
3218
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3219
                                 self.op.mode)
3220

    
3221
    if (not self.cfg.GetVGName() and
3222
        self.op.disk_template not in constants.DTS_NOT_LVM):
3223
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3224
                                 " instances")
3225

    
3226
    if self.op.mode == constants.INSTANCE_IMPORT:
3227
      src_node = getattr(self.op, "src_node", None)
3228
      src_path = getattr(self.op, "src_path", None)
3229
      if src_node is None or src_path is None:
3230
        raise errors.OpPrereqError("Importing an instance requires source"
3231
                                   " node and path options")
3232
      src_node_full = self.cfg.ExpandNodeName(src_node)
3233
      if src_node_full is None:
3234
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3235
      self.op.src_node = src_node = src_node_full
3236

    
3237
      if not os.path.isabs(src_path):
3238
        raise errors.OpPrereqError("The source path must be absolute")
3239

    
3240
      export_info = rpc.call_export_info(src_node, src_path)
3241

    
3242
      if not export_info:
3243
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3244

    
3245
      if not export_info.has_section(constants.INISECT_EXP):
3246
        raise errors.ProgrammerError("Corrupted export config")
3247

    
3248
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3249
      if (int(ei_version) != constants.EXPORT_VERSION):
3250
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3251
                                   (ei_version, constants.EXPORT_VERSION))
3252

    
3253
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3254
        raise errors.OpPrereqError("Can't import instance with more than"
3255
                                   " one data disk")
3256

    
3257
      # FIXME: are the old os-es, disk sizes, etc. useful?
3258
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3259
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3260
                                                         'disk0_dump'))
3261
      self.src_image = diskimage
3262
    else: # INSTANCE_CREATE
3263
      if getattr(self.op, "os_type", None) is None:
3264
        raise errors.OpPrereqError("No guest OS specified")
3265

    
3266
    #### instance parameters check
3267

    
3268
    # disk template and mirror node verification
3269
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3270
      raise errors.OpPrereqError("Invalid disk template name")
3271

    
3272
    # instance name verification
3273
    hostname1 = utils.HostInfo(self.op.instance_name)
3274

    
3275
    self.op.instance_name = instance_name = hostname1.name
3276
    instance_list = self.cfg.GetInstanceList()
3277
    if instance_name in instance_list:
3278
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3279
                                 instance_name)
3280

    
3281
    # ip validity checks
3282
    ip = getattr(self.op, "ip", None)
3283
    if ip is None or ip.lower() == "none":
3284
      inst_ip = None
3285
    elif ip.lower() == "auto":
3286
      inst_ip = hostname1.ip
3287
    else:
3288
      if not utils.IsValidIP(ip):
3289
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3290
                                   " like a valid IP" % ip)
3291
      inst_ip = ip
3292
    self.inst_ip = self.op.ip = inst_ip
3293

    
3294
    if self.op.start and not self.op.ip_check:
3295
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3296
                                 " adding an instance in start mode")
3297

    
3298
    if self.op.ip_check:
3299
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3300
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3301
                                   (hostname1.ip, instance_name))
3302

    
3303
    # MAC address verification
3304
    if self.op.mac != "auto":
3305
      if not utils.IsValidMac(self.op.mac.lower()):
3306
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3307
                                   self.op.mac)
3308

    
3309
    # bridge verification
3310
    bridge = getattr(self.op, "bridge", None)
3311
    if bridge is None:
3312
      self.op.bridge = self.cfg.GetDefBridge()
3313
    else:
3314
      self.op.bridge = bridge
3315

    
3316
    # boot order verification
3317
    if self.op.hvm_boot_order is not None:
3318
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3319
        raise errors.OpPrereqError("invalid boot order specified,"
3320
                                   " must be one or more of [acdn]")
3321
    # file storage checks
3322
    if (self.op.file_driver and
3323
        not self.op.file_driver in constants.FILE_DRIVER):
3324
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3325
                                 self.op.file_driver)
3326

    
3327
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3328
        raise errors.OpPrereqError("File storage directory not a relative"
3329
                                   " path")
3330
    #### allocator run
3331

    
3332
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3333
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3334
                                 " node must be given")
3335

    
3336
    if self.op.iallocator is not None:
3337
      self._RunAllocator()
3338

    
3339
    #### node related checks
3340

    
3341
    # check primary node
3342
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3343
    if pnode is None:
3344
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3345
                                 self.op.pnode)
3346
    self.op.pnode = pnode.name
3347
    self.pnode = pnode
3348
    self.secondaries = []
3349

    
3350
    # mirror node verification
3351
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3352
      if getattr(self.op, "snode", None) is None:
3353
        raise errors.OpPrereqError("The networked disk templates need"
3354
                                   " a mirror node")
3355

    
3356
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3357
      if snode_name is None:
3358
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3359
                                   self.op.snode)
3360
      elif snode_name == pnode.name:
3361
        raise errors.OpPrereqError("The secondary node cannot be"
3362
                                   " the primary node.")
3363
      self.secondaries.append(snode_name)
3364

    
3365
    req_size = _ComputeDiskSize(self.op.disk_template,
3366
                                self.op.disk_size, self.op.swap_size)
3367

    
3368
    # Check lv size requirements
3369
    if req_size is not None:
3370
      nodenames = [pnode.name] + self.secondaries
3371
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3372
      for node in nodenames:
3373
        info = nodeinfo.get(node, None)
3374
        if not info:
3375
          raise errors.OpPrereqError("Cannot get current information"
3376
                                     " from node '%s'" % nodeinfo)
3377
        vg_free = info.get('vg_free', None)
3378
        if not isinstance(vg_free, int):
3379
          raise errors.OpPrereqError("Can't compute free disk space on"
3380
                                     " node %s" % node)
3381
        if req_size > info['vg_free']:
3382
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3383
                                     " %d MB available, %d MB required" %
3384
                                     (node, info['vg_free'], req_size))
3385

    
3386
    # os verification
3387
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3388
    if not os_obj:
3389
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3390
                                 " primary node"  % self.op.os_type)
3391

    
3392
    if self.op.kernel_path == constants.VALUE_NONE:
3393
      raise errors.OpPrereqError("Can't set instance kernel to none")
3394

    
3395

    
3396
    # bridge check on primary node
3397
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3398
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3399
                                 " destination node '%s'" %
3400
                                 (self.op.bridge, pnode.name))
3401

    
3402
    if self.op.start:
3403
      self.instance_status = 'up'
3404
    else:
3405
      self.instance_status = 'down'
3406

    
3407
  def Exec(self, feedback_fn):
3408
    """Create and add the instance to the cluster.
3409

3410
    """
3411
    instance = self.op.instance_name
3412
    pnode_name = self.pnode.name
3413

    
3414
    if self.op.mac == "auto":
3415
      mac_address = self.cfg.GenerateMAC()
3416
    else:
3417
      mac_address = self.op.mac
3418

    
3419
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3420
    if self.inst_ip is not None:
3421
      nic.ip = self.inst_ip
3422

    
3423
    ht_kind = self.sstore.GetHypervisorType()
3424
    if ht_kind in constants.HTS_REQ_PORT:
3425
      network_port = self.cfg.AllocatePort()
3426
    else:
3427
      network_port = None
3428

    
3429
    # this is needed because os.path.join does not accept None arguments
3430
    if self.op.file_storage_dir is None:
3431
      string_file_storage_dir = ""
3432
    else:
3433
      string_file_storage_dir = self.op.file_storage_dir
3434

    
3435
    # build the full file storage dir path
3436
    file_storage_dir = os.path.normpath(os.path.join(
3437
                                        self.sstore.GetFileStorageDir(),
3438
                                        string_file_storage_dir, instance))
3439

    
3440

    
3441
    disks = _GenerateDiskTemplate(self.cfg,
3442
                                  self.op.disk_template,
3443
                                  instance, pnode_name,
3444
                                  self.secondaries, self.op.disk_size,
3445
                                  self.op.swap_size,
3446
                                  file_storage_dir,
3447
                                  self.op.file_driver)
3448

    
3449
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3450
                            primary_node=pnode_name,
3451
                            memory=self.op.mem_size,
3452
                            vcpus=self.op.vcpus,
3453
                            nics=[nic], disks=disks,
3454
                            disk_template=self.op.disk_template,
3455
                            status=self.instance_status,
3456
                            network_port=network_port,
3457
                            kernel_path=self.op.kernel_path,
3458
                            initrd_path=self.op.initrd_path,
3459
                            hvm_boot_order=self.op.hvm_boot_order,
3460
                            )
3461

    
3462
    feedback_fn("* creating instance disks...")
3463
    if not _CreateDisks(self.cfg, iobj):
3464
      _RemoveDisks(iobj, self.cfg)
3465
      raise errors.OpExecError("Device creation failed, reverting...")
3466

    
3467
    feedback_fn("adding instance %s to cluster config" % instance)
3468

    
3469
    self.cfg.AddInstance(iobj)
3470

    
3471
    if self.op.wait_for_sync:
3472
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3473
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3474
      # make sure the disks are not degraded (still sync-ing is ok)
3475
      time.sleep(15)
3476
      feedback_fn("* checking mirrors status")
3477
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3478
    else:
3479
      disk_abort = False
3480

    
3481
    if disk_abort:
3482
      _RemoveDisks(iobj, self.cfg)
3483
      self.cfg.RemoveInstance(iobj.name)
3484
      raise errors.OpExecError("There are some degraded disks for"
3485
                               " this instance")
3486

    
3487
    feedback_fn("creating os for instance %s on node %s" %
3488
                (instance, pnode_name))
3489

    
3490
    if iobj.disk_template != constants.DT_DISKLESS:
3491
      if self.op.mode == constants.INSTANCE_CREATE:
3492
        feedback_fn("* running the instance OS create scripts...")
3493
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3494
          raise errors.OpExecError("could not add os for instance %s"
3495
                                   " on node %s" %
3496
                                   (instance, pnode_name))
3497

    
3498
      elif self.op.mode == constants.INSTANCE_IMPORT:
3499
        feedback_fn("* running the instance OS import scripts...")
3500
        src_node = self.op.src_node
3501
        src_image = self.src_image
3502
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3503
                                                src_node, src_image):
3504
          raise errors.OpExecError("Could not import os for instance"
3505
                                   " %s on node %s" %
3506
                                   (instance, pnode_name))
3507
      else:
3508
        # also checked in the prereq part
3509
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3510
                                     % self.op.mode)
3511

    
3512
    if self.op.start:
3513
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3514
      feedback_fn("* starting instance...")
3515
      if not rpc.call_instance_start(pnode_name, iobj, None):
3516
        raise errors.OpExecError("Could not start instance")
3517

    
3518

    
3519
class LUConnectConsole(NoHooksLU):
3520
  """Connect to an instance's console.
3521

3522
  This is somewhat special in that it returns the command line that
3523
  you need to run on the master node in order to connect to the
3524
  console.
3525

3526
  """
3527
  _OP_REQP = ["instance_name"]
3528

    
3529
  def CheckPrereq(self):
3530
    """Check prerequisites.
3531

3532
    This checks that the instance is in the cluster.
3533

3534
    """
3535
    instance = self.cfg.GetInstanceInfo(
3536
      self.cfg.ExpandInstanceName(self.op.instance_name))
3537
    if instance is None:
3538
      raise errors.OpPrereqError("Instance '%s' not known" %
3539
                                 self.op.instance_name)
3540
    self.instance = instance
3541

    
3542
  def Exec(self, feedback_fn):
3543
    """Connect to the console of an instance
3544

3545
    """
3546
    instance = self.instance
3547
    node = instance.primary_node
3548

    
3549
    node_insts = rpc.call_instance_list([node])[node]
3550
    if node_insts is False:
3551
      raise errors.OpExecError("Can't connect to node %s." % node)
3552

    
3553
    if instance.name not in node_insts:
3554
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3555

    
3556
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3557

    
3558
    hyper = hypervisor.GetHypervisor()
3559
    console_cmd = hyper.GetShellCommandForConsole(instance)
3560

    
3561
    # build ssh cmdline
3562
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3563

    
3564

    
3565
class LUReplaceDisks(LogicalUnit):
3566
  """Replace the disks of an instance.
3567

3568
  """
3569
  HPATH = "mirrors-replace"
3570
  HTYPE = constants.HTYPE_INSTANCE
3571
  _OP_REQP = ["instance_name", "mode", "disks"]
3572

    
3573
  def BuildHooksEnv(self):
3574
    """Build hooks env.
3575

3576
    This runs on the master, the primary and all the secondaries.
3577

3578
    """
3579
    env = {
3580
      "MODE": self.op.mode,
3581
      "NEW_SECONDARY": self.op.remote_node,
3582
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3583
      }
3584
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3585
    nl = [
3586
      self.sstore.GetMasterNode(),
3587
      self.instance.primary_node,
3588
      ]
3589
    if self.op.remote_node is not None:
3590
      nl.append(self.op.remote_node)
3591
    return env, nl, nl
3592

    
3593
  def CheckPrereq(self):
3594
    """Check prerequisites.
3595

3596
    This checks that the instance is in the cluster.
3597

3598
    """
3599
    instance = self.cfg.GetInstanceInfo(
3600
      self.cfg.ExpandInstanceName(self.op.instance_name))
3601
    if instance is None:
3602
      raise errors.OpPrereqError("Instance '%s' not known" %
3603
                                 self.op.instance_name)
3604
    self.instance = instance
3605
    self.op.instance_name = instance.name
3606

    
3607
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3608
      raise errors.OpPrereqError("Instance's disk layout is not"
3609
                                 " network mirrored.")
3610

    
3611
    if len(instance.secondary_nodes) != 1:
3612
      raise errors.OpPrereqError("The instance has a strange layout,"
3613
                                 " expected one secondary but found %d" %
3614
                                 len(instance.secondary_nodes))
3615

    
3616
    self.sec_node = instance.secondary_nodes[0]
3617

    
3618
    remote_node = getattr(self.op, "remote_node", None)
3619
    if remote_node is not None:
3620
      remote_node = self.cfg.ExpandNodeName(remote_node)
3621
      if remote_node is None:
3622
        raise errors.OpPrereqError("Node '%s' not known" %
3623
                                   self.op.remote_node)
3624
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3625
    else:
3626
      self.remote_node_info = None
3627
    if remote_node == instance.primary_node:
3628
      raise errors.OpPrereqError("The specified node is the primary node of"
3629
                                 " the instance.")
3630
    elif remote_node == self.sec_node:
3631
      if self.op.mode == constants.REPLACE_DISK_SEC:
3632
        # this is for DRBD8, where we can't execute the same mode of
3633
        # replacement as for drbd7 (no different port allocated)
3634
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3635
                                   " replacement")
3636
      # the user gave the current secondary, switch to
3637
      # 'no-replace-secondary' mode for drbd7
3638
      remote_node = None
3639
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3640
        self.op.mode != constants.REPLACE_DISK_ALL):
3641
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3642
                                 " disks replacement, not individual ones")
3643
    if instance.disk_template == constants.DT_DRBD8:
3644
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3645
          remote_node is not None):
3646
        # switch to replace secondary mode
3647
        self.op.mode = constants.REPLACE_DISK_SEC
3648

    
3649
      if self.op.mode == constants.REPLACE_DISK_ALL:
3650
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3651
                                   " secondary disk replacement, not"
3652
                                   " both at once")
3653
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3654
        if remote_node is not None:
3655
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3656
                                     " the secondary while doing a primary"
3657
                                     " node disk replacement")
3658
        self.tgt_node = instance.primary_node
3659
        self.oth_node = instance.secondary_nodes[0]
3660
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3661
        self.new_node = remote_node # this can be None, in which case
3662
                                    # we don't change the secondary
3663
        self.tgt_node = instance.secondary_nodes[0]
3664
        self.oth_node = instance.primary_node
3665
      else:
3666
        raise errors.ProgrammerError("Unhandled disk replace mode")
3667

    
3668
    for name in self.op.disks:
3669
      if instance.FindDisk(name) is None:
3670
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3671
                                   (name, instance.name))
3672
    self.op.remote_node = remote_node
3673

    
3674
  def _ExecRR1(self, feedback_fn):
3675
    """Replace the disks of an instance.
3676

3677
    """
3678
    instance = self.instance
3679
    iv_names = {}
3680
    # start of work
3681
    if self.op.remote_node is None:
3682
      remote_node = self.sec_node
3683
    else:
3684
      remote_node = self.op.remote_node
3685
    cfg = self.cfg
3686
    for dev in instance.disks:
3687
      size = dev.size
3688
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3689
      names = _GenerateUniqueNames(cfg, lv_names)
3690
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3691
                                       remote_node, size, names)
3692
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3693
      logger.Info("adding new mirror component on secondary for %s" %
3694
                  dev.iv_name)
3695
      #HARDCODE
3696
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3697
                                        new_drbd, False,
3698
                                        _GetInstanceInfoText(instance)):
3699
        raise errors.OpExecError("Failed to create new component on secondary"
3700
                                 " node %s. Full abort, cleanup manually!" %
3701
                                 remote_node)
3702

    
3703
      logger.Info("adding new mirror component on primary")
3704
      #HARDCODE
3705
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3706
                                      instance, new_drbd,
3707
                                      _GetInstanceInfoText(instance)):
3708
        # remove secondary dev
3709
        cfg.SetDiskID(new_drbd, remote_node)
3710
        rpc.call_blockdev_remove(remote_node, new_drbd)
3711
        raise errors.OpExecError("Failed to create volume on primary!"
3712
                                 " Full abort, cleanup manually!!")
3713

    
3714
      # the device exists now
3715
      # call the primary node to add the mirror to md
3716
      logger.Info("adding new mirror component to md")
3717
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3718
                                           [new_drbd]):
3719
        logger.Error("Can't add mirror compoment to md!")
3720
        cfg.SetDiskID(new_drbd, remote_node)
3721
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3722
          logger.Error("Can't rollback on secondary")
3723
        cfg.SetDiskID(new_drbd, instance.primary_node)
3724
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3725
          logger.Error("Can't rollback on primary")
3726
        raise errors.OpExecError("Full abort, cleanup manually!!")
3727

    
3728
      dev.children.append(new_drbd)
3729
      cfg.AddInstance(instance)
3730

    
3731
    # this can fail as the old devices are degraded and _WaitForSync
3732
    # does a combined result over all disks, so we don't check its
3733
    # return value
3734
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3735

    
3736
    # so check manually all the devices
3737
    for name in iv_names:
3738
      dev, child, new_drbd = iv_names[name]
3739
      cfg.SetDiskID(dev, instance.primary_node)
3740
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3741
      if is_degr:
3742
        raise errors.OpExecError("MD device %s is degraded!" % name)
3743
      cfg.SetDiskID(new_drbd, instance.primary_node)
3744
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3745
      if is_degr:
3746
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3747

    
3748
    for name in iv_names:
3749
      dev, child, new_drbd = iv_names[name]
3750
      logger.Info("remove mirror %s component" % name)
3751
      cfg.SetDiskID(dev, instance.primary_node)
3752
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3753
                                              dev, [child]):
3754
        logger.Error("Can't remove child from mirror, aborting"
3755
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3756
        continue
3757

    
3758
      for node in child.logical_id[:2]:
3759
        logger.Info("remove child device on %s" % node)
3760
        cfg.SetDiskID(child, node)
3761
        if not rpc.call_blockdev_remove(node, child):
3762
          logger.Error("Warning: failed to remove device from node %s,"
3763
                       " continuing operation." % node)
3764

    
3765
      dev.children.remove(child)
3766

    
3767
      cfg.AddInstance(instance)
3768

    
3769
  def _ExecD8DiskOnly(self, feedback_fn):
3770
    """Replace a disk on the primary or secondary for dbrd8.
3771

3772
    The algorithm for replace is quite complicated:
3773
      - for each disk to be replaced:
3774
        - create new LVs on the target node with unique names
3775
        - detach old LVs from the drbd device
3776
        - rename old LVs to name_replaced.<time_t>
3777
        - rename new LVs to old LVs
3778
        - attach the new LVs (with the old names now) to the drbd device
3779
      - wait for sync across all devices
3780
      - for each modified disk:
3781
        - remove old LVs (which have the name name_replaces.<time_t>)
3782

3783
    Failures are not very well handled.
3784

3785
    """
3786
    steps_total = 6
3787
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3788
    instance = self.instance
3789
    iv_names = {}
3790
    vgname = self.cfg.GetVGName()
3791
    # start of work
3792
    cfg = self.cfg
3793
    tgt_node = self.tgt_node
3794
    oth_node = self.oth_node
3795

    
3796
    # Step: check device activation
3797
    self.proc.LogStep(1, steps_total, "check device existence")
3798
    info("checking volume groups")
3799
    my_vg = cfg.GetVGName()
3800
    results = rpc.call_vg_list([oth_node, tgt_node])
3801
    if not results:
3802
      raise errors.OpExecError("Can't list volume groups on the nodes")
3803
    for node in oth_node, tgt_node:
3804
      res = results.get(node, False)
3805
      if not res or my_vg not in res:
3806
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3807
                                 (my_vg, node))
3808
    for dev in instance.disks:
3809
      if not dev.iv_name in self.op.disks:
3810
        continue
3811
      for node in tgt_node, oth_node:
3812
        info("checking %s on %s" % (dev.iv_name, node))
3813
        cfg.SetDiskID(dev, node)
3814
        if not rpc.call_blockdev_find(node, dev):
3815
          raise errors.OpExecError("Can't find device %s on node %s" %
3816
                                   (dev.iv_name, node))
3817

    
3818
    # Step: check other node consistency
3819
    self.proc.LogStep(2, steps_total, "check peer consistency")
3820
    for dev in instance.disks:
3821
      if not dev.iv_name in self.op.disks:
3822
        continue
3823
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3824
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3825
                                   oth_node==instance.primary_node):
3826
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3827
                                 " to replace disks on this node (%s)" %
3828
                                 (oth_node, tgt_node))
3829

    
3830
    # Step: create new storage
3831
    self.proc.LogStep(3, steps_total, "allocate new storage")
3832
    for dev in instance.disks:
3833
      if not dev.iv_name in self.op.disks:
3834
        continue
3835
      size = dev.size
3836
      cfg.SetDiskID(dev, tgt_node)
3837
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3838
      names = _GenerateUniqueNames(cfg, lv_names)
3839
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3840
                             logical_id=(vgname, names[0]))
3841
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3842
                             logical_id=(vgname, names[1]))
3843
      new_lvs = [lv_data, lv_meta]
3844
      old_lvs = dev.children
3845
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3846
      info("creating new local storage on %s for %s" %
3847
           (tgt_node, dev.iv_name))
3848
      # since we *always* want to create this LV, we use the
3849
      # _Create...OnPrimary (which forces the creation), even if we
3850
      # are talking about the secondary node
3851
      for new_lv in new_lvs:
3852
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3853
                                        _GetInstanceInfoText(instance)):
3854
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3855
                                   " node '%s'" %
3856
                                   (new_lv.logical_id[1], tgt_node))
3857

    
3858
    # Step: for each lv, detach+rename*2+attach
3859
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3860
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3861
      info("detaching %s drbd from local storage" % dev.iv_name)
3862
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3863
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3864
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3865
      #dev.children = []
3866
      #cfg.Update(instance)
3867

    
3868
      # ok, we created the new LVs, so now we know we have the needed
3869
      # storage; as such, we proceed on the target node to rename
3870
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3871
      # using the assumption that logical_id == physical_id (which in
3872
      # turn is the unique_id on that node)
3873

    
3874
      # FIXME(iustin): use a better name for the replaced LVs
3875
      temp_suffix = int(time.time())
3876
      ren_fn = lambda d, suff: (d.physical_id[0],
3877
                                d.physical_id[1] + "_replaced-%s" % suff)
3878
      # build the rename list based on what LVs exist on the node
3879
      rlist = []
3880
      for to_ren in old_lvs:
3881
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3882
        if find_res is not None: # device exists
3883
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3884

    
3885
      info("renaming the old LVs on the target node")
3886
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3887
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3888
      # now we rename the new LVs to the old LVs
3889
      info("renaming the new LVs on the target node")
3890
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3891
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3892
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3893

    
3894
      for old, new in zip(old_lvs, new_lvs):
3895
        new.logical_id = old.logical_id
3896
        cfg.SetDiskID(new, tgt_node)
3897

    
3898
      for disk in old_lvs:
3899
        disk.logical_id = ren_fn(disk, temp_suffix)
3900
        cfg.SetDiskID(disk, tgt_node)
3901

    
3902
      # now that the new lvs have the old name, we can add them to the device
3903
      info("adding new mirror component on %s" % tgt_node)
3904
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3905
        for new_lv in new_lvs:
3906
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3907
            warning("Can't rollback device %s", hint="manually cleanup unused"
3908
                    " logical volumes")
3909
        raise errors.OpExecError("Can't add local storage to drbd")
3910

    
3911
      dev.children = new_lvs
3912
      cfg.Update(instance)
3913

    
3914
    # Step: wait for sync
3915

    
3916
    # this can fail as the old devices are degraded and _WaitForSync
3917
    # does a combined result over all disks, so we don't check its
3918
    # return value
3919
    self.proc.LogStep(5, steps_total, "sync devices")
3920
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3921

    
3922
    # so check manually all the devices
3923
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3924
      cfg.SetDiskID(dev, instance.primary_node)
3925
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3926
      if is_degr:
3927
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3928

    
3929
    # Step: remove old storage
3930
    self.proc.LogStep(6, steps_total, "removing old storage")
3931
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3932
      info("remove logical volumes for %s" % name)
3933
      for lv in old_lvs:
3934
        cfg.SetDiskID(lv, tgt_node)
3935
        if not rpc.call_blockdev_remove(tgt_node, lv):
3936
          warning("Can't remove old LV", hint="manually remove unused LVs")
3937
          continue
3938

    
3939
  def _ExecD8Secondary(self, feedback_fn):
3940
    """Replace the secondary node for drbd8.
3941

3942
    The algorithm for replace is quite complicated:
3943
      - for all disks of the instance:
3944
        - create new LVs on the new node with same names
3945
        - shutdown the drbd device on the old secondary
3946
        - disconnect the drbd network on the primary
3947
        - create the drbd device on the new secondary
3948
        - network attach the drbd on the primary, using an artifice:
3949
          the drbd code for Attach() will connect to the network if it
3950
          finds a device which is connected to the good local disks but
3951
          not network enabled
3952
      - wait for sync across all devices
3953
      - remove all disks from the old secondary
3954

3955
    Failures are not very well handled.
3956

3957
    """
3958
    steps_total = 6
3959
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3960
    instance = self.instance
3961
    iv_names = {}
3962
    vgname = self.cfg.GetVGName()
3963
    # start of work
3964
    cfg = self.cfg
3965
    old_node = self.tgt_node
3966
    new_node = self.new_node
3967
    pri_node = instance.primary_node
3968

    
3969
    # Step: check device activation
3970
    self.proc.LogStep(1, steps_total, "check device existence")
3971
    info("checking volume groups")
3972
    my_vg = cfg.GetVGName()
3973
    results = rpc.call_vg_list([pri_node, new_node])
3974
    if not results:
3975
      raise errors.OpExecError("Can't list volume groups on the nodes")
3976
    for node in pri_node, new_node:
3977
      res = results.get(node, False)
3978
      if not res or my_vg not in res:
3979
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3980
                                 (my_vg, node))
3981
    for dev in instance.disks:
3982
      if not dev.iv_name in self.op.disks:
3983
        continue
3984
      info("checking %s on %s" % (dev.iv_name, pri_node))
3985
      cfg.SetDiskID(dev, pri_node)
3986
      if not rpc.call_blockdev_find(pri_node, dev):
3987
        raise errors.OpExecError("Can't find device %s on node %s" %
3988
                                 (dev.iv_name, pri_node))
3989

    
3990
    # Step: check other node consistency
3991
    self.proc.LogStep(2, steps_total, "check peer consistency")
3992
    for dev in instance.disks:
3993
      if not dev.iv_name in self.op.disks:
3994
        continue
3995
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3996
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3997
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3998
                                 " unsafe to replace the secondary" %
3999
                                 pri_node)
4000

    
4001
    # Step: create new storage
4002
    self.proc.LogStep(3, steps_total, "allocate new storage")
4003
    for dev in instance.disks:
4004
      size = dev.size
4005
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4006
      # since we *always* want to create this LV, we use the
4007
      # _Create...OnPrimary (which forces the creation), even if we
4008
      # are talking about the secondary node
4009
      for new_lv in dev.children:
4010
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4011
                                        _GetInstanceInfoText(instance)):
4012
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4013
                                   " node '%s'" %
4014
                                   (new_lv.logical_id[1], new_node))
4015

    
4016
      iv_names[dev.iv_name] = (dev, dev.children)
4017

    
4018
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4019
    for dev in instance.disks:
4020
      size = dev.size
4021
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4022
      # create new devices on new_node
4023
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4024
                              logical_id=(pri_node, new_node,
4025
                                          dev.logical_id[2]),
4026
                              children=dev.children)
4027
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4028
                                        new_drbd, False,
4029
                                      _GetInstanceInfoText(instance)):
4030
        raise errors.OpExecError("Failed to create new DRBD on"
4031
                                 " node '%s'" % new_node)
4032

    
4033
    for dev in instance.disks:
4034
      # we have new devices, shutdown the drbd on the old secondary
4035
      info("shutting down drbd for %s on old node" % dev.iv_name)
4036
      cfg.SetDiskID(dev, old_node)
4037
      if not rpc.call_blockdev_shutdown(old_node, dev):
4038
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4039
                hint="Please cleanup this device manually as soon as possible")
4040

    
4041
    info("detaching primary drbds from the network (=> standalone)")
4042
    done = 0
4043
    for dev in instance.disks:
4044
      cfg.SetDiskID(dev, pri_node)
4045
      # set the physical (unique in bdev terms) id to None, meaning
4046
      # detach from network
4047
      dev.physical_id = (None,) * len(dev.physical_id)
4048
      # and 'find' the device, which will 'fix' it to match the
4049
      # standalone state
4050
      if rpc.call_blockdev_find(pri_node, dev):
4051
        done += 1
4052
      else:
4053
        warning("Failed to detach drbd %s from network, unusual case" %
4054
                dev.iv_name)
4055

    
4056
    if not done:
4057
      # no detaches succeeded (very unlikely)
4058
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4059

    
4060
    # if we managed to detach at least one, we update all the disks of
4061
    # the instance to point to the new secondary
4062
    info("updating instance configuration")
4063
    for dev in instance.disks:
4064
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4065
      cfg.SetDiskID(dev, pri_node)
4066
    cfg.Update(instance)
4067

    
4068
    # and now perform the drbd attach
4069
    info("attaching primary drbds to new secondary (standalone => connected)")
4070
    failures = []
4071
    for dev in instance.disks:
4072
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4073
      # since the attach is smart, it's enough to 'find' the device,
4074
      # it will automatically activate the network, if the physical_id
4075
      # is correct
4076
      cfg.SetDiskID(dev, pri_node)
4077
      if not rpc.call_blockdev_find(pri_node, dev):
4078
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4079
                "please do a gnt-instance info to see the status of disks")
4080

    
4081
    # this can fail as the old devices are degraded and _WaitForSync
4082
    # does a combined result over all disks, so we don't check its
4083
    # return value
4084
    self.proc.LogStep(5, steps_total, "sync devices")
4085
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4086

    
4087
    # so check manually all the devices
4088
    for name, (dev, old_lvs) in iv_names.iteritems():
4089
      cfg.SetDiskID(dev, pri_node)
4090
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4091
      if is_degr:
4092
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4093

    
4094
    self.proc.LogStep(6, steps_total, "removing old storage")
4095
    for name, (dev, old_lvs) in iv_names.iteritems():
4096
      info("remove logical volumes for %s" % name)
4097
      for lv in old_lvs:
4098
        cfg.SetDiskID(lv, old_node)
4099
        if not rpc.call_blockdev_remove(old_node, lv):
4100
          warning("Can't remove LV on old secondary",
4101
                  hint="Cleanup stale volumes by hand")
4102

    
4103
  def Exec(self, feedback_fn):
4104
    """Execute disk replacement.
4105

4106
    This dispatches the disk replacement to the appropriate handler.
4107

4108
    """
4109
    instance = self.instance
4110
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4111
      fn = self._ExecRR1
4112
    elif instance.disk_template == constants.DT_DRBD8:
4113
      if self.op.remote_node is None:
4114
        fn = self._ExecD8DiskOnly
4115
      else:
4116
        fn = self._ExecD8Secondary
4117
    else:
4118
      raise errors.ProgrammerError("Unhandled disk replacement case")
4119
    return fn(feedback_fn)
4120

    
4121

    
4122
class LUQueryInstanceData(NoHooksLU):
4123
  """Query runtime instance data.
4124

4125
  """
4126
  _OP_REQP = ["instances"]
4127

    
4128
  def CheckPrereq(self):
4129
    """Check prerequisites.
4130

4131
    This only checks the optional instance list against the existing names.
4132

4133
    """
4134
    if not isinstance(self.op.instances, list):
4135
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4136
    if self.op.instances:
4137
      self.wanted_instances = []
4138
      names = self.op.instances
4139
      for name in names:
4140
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4141
        if instance is None:
4142
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4143
        self.wanted_instances.append(instance)
4144
    else:
4145
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4146
                               in self.cfg.GetInstanceList()]
4147
    return
4148

    
4149

    
4150
  def _ComputeDiskStatus(self, instance, snode, dev):
4151
    """Compute block device status.
4152

4153
    """
4154
    self.cfg.SetDiskID(dev, instance.primary_node)
4155
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4156
    if dev.dev_type in constants.LDS_DRBD:
4157
      # we change the snode then (otherwise we use the one passed in)
4158
      if dev.logical_id[0] == instance.primary_node:
4159
        snode = dev.logical_id[1]
4160
      else:
4161
        snode = dev.logical_id[0]
4162

    
4163
    if snode:
4164
      self.cfg.SetDiskID(dev, snode)
4165
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4166
    else:
4167
      dev_sstatus = None
4168

    
4169
    if dev.children:
4170
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4171
                      for child in dev.children]
4172
    else:
4173
      dev_children = []
4174

    
4175
    data = {
4176
      "iv_name": dev.iv_name,
4177
      "dev_type": dev.dev_type,
4178
      "logical_id": dev.logical_id,
4179
      "physical_id": dev.physical_id,
4180
      "pstatus": dev_pstatus,
4181
      "sstatus": dev_sstatus,
4182
      "children": dev_children,
4183
      }
4184

    
4185
    return data
4186

    
4187
  def Exec(self, feedback_fn):
4188
    """Gather and return data"""
4189
    result = {}
4190
    for instance in self.wanted_instances:
4191
      remote_info = rpc.call_instance_info(instance.primary_node,
4192
                                                instance.name)
4193
      if remote_info and "state" in remote_info:
4194
        remote_state = "up"
4195
      else:
4196
        remote_state = "down"
4197
      if instance.status == "down":
4198
        config_state = "down"
4199
      else:
4200
        config_state = "up"
4201

    
4202
      disks = [self._ComputeDiskStatus(instance, None, device)
4203
               for device in instance.disks]
4204

    
4205
      idict = {
4206
        "name": instance.name,
4207
        "config_state": config_state,
4208
        "run_state": remote_state,
4209
        "pnode": instance.primary_node,
4210
        "snodes": instance.secondary_nodes,
4211
        "os": instance.os,
4212
        "memory": instance.memory,
4213
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4214
        "disks": disks,
4215
        "network_port": instance.network_port,
4216
        "vcpus": instance.vcpus,
4217
        "kernel_path": instance.kernel_path,
4218
        "initrd_path": instance.initrd_path,
4219
        "hvm_boot_order": instance.hvm_boot_order,
4220
        }
4221

    
4222
      result[instance.name] = idict
4223

    
4224
    return result
4225

    
4226

    
4227
class LUSetInstanceParams(LogicalUnit):
4228
  """Modifies an instances's parameters.
4229

4230
  """
4231
  HPATH = "instance-modify"
4232
  HTYPE = constants.HTYPE_INSTANCE
4233
  _OP_REQP = ["instance_name"]
4234

    
4235
  def BuildHooksEnv(self):
4236
    """Build hooks env.
4237

4238
    This runs on the master, primary and secondaries.
4239

4240
    """
4241
    args = dict()
4242
    if self.mem:
4243
      args['memory'] = self.mem
4244
    if self.vcpus:
4245
      args['vcpus'] = self.vcpus
4246
    if self.do_ip or self.do_bridge or self.mac:
4247
      if self.do_ip:
4248
        ip = self.ip
4249
      else:
4250
        ip = self.instance.nics[0].ip
4251
      if self.bridge:
4252
        bridge = self.bridge
4253
      else:
4254
        bridge = self.instance.nics[0].bridge
4255
      if self.mac:
4256
        mac = self.mac
4257
      else:
4258
        mac = self.instance.nics[0].mac
4259
      args['nics'] = [(ip, bridge, mac)]
4260
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4261
    nl = [self.sstore.GetMasterNode(),
4262
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4263
    return env, nl, nl
4264

    
4265
  def CheckPrereq(self):
4266
    """Check prerequisites.
4267

4268
    This only checks the instance list against the existing names.
4269

4270
    """
4271
    self.mem = getattr(self.op, "mem", None)
4272
    self.vcpus = getattr(self.op, "vcpus", None)
4273
    self.ip = getattr(self.op, "ip", None)
4274
    self.mac = getattr(self.op, "mac", None)
4275
    self.bridge = getattr(self.op, "bridge", None)
4276
    self.kernel_path = getattr(self.op, "kernel_path", None)
4277
    self.initrd_path = getattr(self.op, "initrd_path", None)
4278
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4279
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4280
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4281
    if all_params.count(None) == len(all_params):
4282
      raise errors.OpPrereqError("No changes submitted")
4283
    if self.mem is not None:
4284
      try:
4285
        self.mem = int(self.mem)
4286
      except ValueError, err:
4287
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4288
    if self.vcpus is not None:
4289
      try:
4290
        self.vcpus = int(self.vcpus)
4291
      except ValueError, err:
4292
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4293
    if self.ip is not None:
4294
      self.do_ip = True
4295
      if self.ip.lower() == "none":
4296
        self.ip = None
4297
      else:
4298
        if not utils.IsValidIP(self.ip):
4299
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4300
    else:
4301
      self.do_ip = False
4302
    self.do_bridge = (self.bridge is not None)
4303
    if self.mac is not None:
4304
      if self.cfg.IsMacInUse(self.mac):
4305
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4306
                                   self.mac)
4307
      if not utils.IsValidMac(self.mac):
4308
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4309

    
4310
    if self.kernel_path is not None:
4311
      self.do_kernel_path = True
4312
      if self.kernel_path == constants.VALUE_NONE:
4313
        raise errors.OpPrereqError("Can't set instance to no kernel")
4314

    
4315
      if self.kernel_path != constants.VALUE_DEFAULT:
4316
        if not os.path.isabs(self.kernel_path):
4317
          raise errors.OpPrereqError("The kernel path must be an absolute"
4318
                                    " filename")
4319
    else:
4320
      self.do_kernel_path = False
4321

    
4322
    if self.initrd_path is not None:
4323
      self.do_initrd_path = True
4324
      if self.initrd_path not in (constants.VALUE_NONE,
4325
                                  constants.VALUE_DEFAULT):
4326
        if not os.path.isabs(self.initrd_path):
4327
          raise errors.OpPrereqError("The initrd path must be an absolute"
4328
                                    " filename")
4329
    else:
4330
      self.do_initrd_path = False
4331

    
4332
    # boot order verification
4333
    if self.hvm_boot_order is not None:
4334
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4335
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4336
          raise errors.OpPrereqError("invalid boot order specified,"
4337
                                     " must be one or more of [acdn]"
4338
                                     " or 'default'")
4339

    
4340
    instance = self.cfg.GetInstanceInfo(
4341
      self.cfg.ExpandInstanceName(self.op.instance_name))
4342
    if instance is None:
4343
      raise errors.OpPrereqError("No such instance name '%s'" %
4344
                                 self.op.instance_name)
4345
    self.op.instance_name = instance.name
4346
    self.instance = instance
4347
    return
4348

    
4349
  def Exec(self, feedback_fn):
4350
    """Modifies an instance.
4351

4352
    All parameters take effect only at the next restart of the instance.
4353
    """
4354
    result = []
4355
    instance = self.instance
4356
    if self.mem:
4357
      instance.memory = self.mem
4358
      result.append(("mem", self.mem))
4359
    if self.vcpus:
4360
      instance.vcpus = self.vcpus
4361
      result.append(("vcpus",  self.vcpus))
4362
    if self.do_ip:
4363
      instance.nics[0].ip = self.ip
4364
      result.append(("ip", self.ip))
4365
    if self.bridge:
4366
      instance.nics[0].bridge = self.bridge
4367
      result.append(("bridge", self.bridge))
4368
    if self.mac:
4369
      instance.nics[0].mac = self.mac
4370
      result.append(("mac", self.mac))
4371
    if self.do_kernel_path:
4372
      instance.kernel_path = self.kernel_path
4373
      result.append(("kernel_path", self.kernel_path))
4374
    if self.do_initrd_path:
4375
      instance.initrd_path = self.initrd_path
4376
      result.append(("initrd_path", self.initrd_path))
4377
    if self.hvm_boot_order:
4378
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4379
        instance.hvm_boot_order = None
4380
      else:
4381
        instance.hvm_boot_order = self.hvm_boot_order
4382
      result.append(("hvm_boot_order", self.hvm_boot_order))
4383

    
4384
    self.cfg.AddInstance(instance)
4385

    
4386
    return result
4387

    
4388

    
4389
class LUQueryExports(NoHooksLU):
4390
  """Query the exports list
4391

4392
  """
4393
  _OP_REQP = []
4394

    
4395
  def CheckPrereq(self):
4396
    """Check that the nodelist contains only existing nodes.
4397

4398
    """
4399
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4400

    
4401
  def Exec(self, feedback_fn):
4402
    """Compute the list of all the exported system images.
4403

4404
    Returns:
4405
      a dictionary with the structure node->(export-list)
4406
      where export-list is a list of the instances exported on
4407
      that node.
4408

4409
    """
4410
    return rpc.call_export_list(self.nodes)
4411

    
4412

    
4413
class LUExportInstance(LogicalUnit):
4414
  """Export an instance to an image in the cluster.
4415

4416
  """
4417
  HPATH = "instance-export"
4418
  HTYPE = constants.HTYPE_INSTANCE
4419
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4420

    
4421
  def BuildHooksEnv(self):
4422
    """Build hooks env.
4423

4424
    This will run on the master, primary node and target node.
4425

4426
    """
4427
    env = {
4428
      "EXPORT_NODE": self.op.target_node,
4429
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4430
      }
4431
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4432
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4433
          self.op.target_node]
4434
    return env, nl, nl
4435

    
4436
  def CheckPrereq(self):
4437
    """Check prerequisites.
4438

4439
    This checks that the instance and node names are valid.
4440

4441
    """
4442
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4443
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4444
    if self.instance is None:
4445
      raise errors.OpPrereqError("Instance '%s' not found" %
4446
                                 self.op.instance_name)
4447

    
4448
    # node verification
4449
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4450
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4451

    
4452
    if self.dst_node is None:
4453
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4454
                                 self.op.target_node)
4455
    self.op.target_node = self.dst_node.name
4456

    
4457
  def Exec(self, feedback_fn):
4458
    """Export an instance to an image in the cluster.
4459

4460
    """
4461
    instance = self.instance
4462
    dst_node = self.dst_node
4463
    src_node = instance.primary_node
4464
    if self.op.shutdown:
4465
      # shutdown the instance, but not the disks
4466
      if not rpc.call_instance_shutdown(src_node, instance):
4467
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4468
                                 (instance.name, src_node))
4469

    
4470
    vgname = self.cfg.GetVGName()
4471

    
4472
    snap_disks = []
4473

    
4474
    try:
4475
      for disk in instance.disks:
4476
        if disk.iv_name == "sda":
4477
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4478
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4479

    
4480
          if not new_dev_name:
4481
            logger.Error("could not snapshot block device %s on node %s" %
4482
                         (disk.logical_id[1], src_node))
4483
          else:
4484
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4485
                                      logical_id=(vgname, new_dev_name),
4486
                                      physical_id=(vgname, new_dev_name),
4487
                                      iv_name=disk.iv_name)
4488
            snap_disks.append(new_dev)
4489

    
4490
    finally:
4491
      if self.op.shutdown and instance.status == "up":
4492
        if not rpc.call_instance_start(src_node, instance, None):
4493
          _ShutdownInstanceDisks(instance, self.cfg)
4494
          raise errors.OpExecError("Could not start instance")
4495

    
4496
    # TODO: check for size
4497

    
4498
    for dev in snap_disks:
4499
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4500
        logger.Error("could not export block device %s from node %s to node %s"
4501
                     % (dev.logical_id[1], src_node, dst_node.name))
4502
      if not rpc.call_blockdev_remove(src_node, dev):
4503
        logger.Error("could not remove snapshot block device %s from node %s" %
4504
                     (dev.logical_id[1], src_node))
4505

    
4506
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4507
      logger.Error("could not finalize export for instance %s on node %s" %
4508
                   (instance.name, dst_node.name))
4509

    
4510
    nodelist = self.cfg.GetNodeList()
4511
    nodelist.remove(dst_node.name)
4512

    
4513
    # on one-node clusters nodelist will be empty after the removal
4514
    # if we proceed the backup would be removed because OpQueryExports
4515
    # substitutes an empty list with the full cluster node list.
4516
    if nodelist:
4517
      op = opcodes.OpQueryExports(nodes=nodelist)
4518
      exportlist = self.proc.ChainOpCode(op)
4519
      for node in exportlist:
4520
        if instance.name in exportlist[node]:
4521
          if not rpc.call_export_remove(node, instance.name):
4522
            logger.Error("could not remove older export for instance %s"
4523
                         " on node %s" % (instance.name, node))
4524

    
4525

    
4526
class LURemoveExport(NoHooksLU):
4527
  """Remove exports related to the named instance.
4528

4529
  """
4530
  _OP_REQP = ["instance_name"]
4531

    
4532
  def CheckPrereq(self):
4533
    """Check prerequisites.
4534
    """
4535
    pass
4536

    
4537
  def Exec(self, feedback_fn):
4538
    """Remove any export.
4539

4540
    """
4541
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4542
    # If the instance was not found we'll try with the name that was passed in.
4543
    # This will only work if it was an FQDN, though.
4544
    fqdn_warn = False
4545
    if not instance_name:
4546
      fqdn_warn = True
4547
      instance_name = self.op.instance_name
4548

    
4549
    op = opcodes.OpQueryExports(nodes=[])
4550
    exportlist = self.proc.ChainOpCode(op)
4551
    found = False
4552
    for node in exportlist:
4553
      if instance_name in exportlist[node]:
4554
        found = True
4555
        if not rpc.call_export_remove(node, instance_name):
4556
          logger.Error("could not remove export for instance %s"
4557
                       " on node %s" % (instance_name, node))
4558

    
4559
    if fqdn_warn and not found:
4560
      feedback_fn("Export not found. If trying to remove an export belonging"
4561
                  " to a deleted instance please use its Fully Qualified"
4562
                  " Domain Name.")
4563

    
4564

    
4565
class TagsLU(NoHooksLU):
4566
  """Generic tags LU.
4567

4568
  This is an abstract class which is the parent of all the other tags LUs.
4569

4570
  """
4571
  def CheckPrereq(self):
4572
    """Check prerequisites.
4573

4574
    """
4575
    if self.op.kind == constants.TAG_CLUSTER:
4576
      self.target = self.cfg.GetClusterInfo()
4577
    elif self.op.kind == constants.TAG_NODE:
4578
      name = self.cfg.ExpandNodeName(self.op.name)
4579
      if name is None:
4580
        raise errors.OpPrereqError("Invalid node name (%s)" %
4581
                                   (self.op.name,))
4582
      self.op.name = name
4583
      self.target = self.cfg.GetNodeInfo(name)
4584
    elif self.op.kind == constants.TAG_INSTANCE:
4585
      name = self.cfg.ExpandInstanceName(self.op.name)
4586
      if name is None:
4587
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4588
                                   (self.op.name,))
4589
      self.op.name = name
4590
      self.target = self.cfg.GetInstanceInfo(name)
4591
    else:
4592
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4593
                                 str(self.op.kind))
4594

    
4595

    
4596
class LUGetTags(TagsLU):
4597
  """Returns the tags of a given object.
4598

4599
  """
4600
  _OP_REQP = ["kind", "name"]
4601

    
4602
  def Exec(self, feedback_fn):
4603
    """Returns the tag list.
4604

4605
    """
4606
    return self.target.GetTags()
4607

    
4608

    
4609
class LUSearchTags(NoHooksLU):
4610
  """Searches the tags for a given pattern.
4611

4612
  """
4613
  _OP_REQP = ["pattern"]
4614

    
4615
  def CheckPrereq(self):
4616
    """Check prerequisites.
4617

4618
    This checks the pattern passed for validity by compiling it.
4619

4620
    """
4621
    try:
4622
      self.re = re.compile(self.op.pattern)
4623
    except re.error, err:
4624
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4625
                                 (self.op.pattern, err))
4626

    
4627
  def Exec(self, feedback_fn):
4628
    """Returns the tag list.
4629

4630
    """
4631
    cfg = self.cfg
4632
    tgts = [("/cluster", cfg.GetClusterInfo())]
4633
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4634
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4635
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4636
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4637
    results = []
4638
    for path, target in tgts:
4639
      for tag in target.GetTags():
4640
        if self.re.search(tag):
4641
          results.append((path, tag))
4642
    return results
4643

    
4644

    
4645
class LUAddTags(TagsLU):
4646
  """Sets a tag on a given object.
4647

4648
  """
4649
  _OP_REQP = ["kind", "name", "tags"]
4650

    
4651
  def CheckPrereq(self):
4652
    """Check prerequisites.
4653

4654
    This checks the type and length of the tag name and value.
4655

4656
    """
4657
    TagsLU.CheckPrereq(self)
4658
    for tag in self.op.tags:
4659
      objects.TaggableObject.ValidateTag(tag)
4660

    
4661
  def Exec(self, feedback_fn):
4662
    """Sets the tag.
4663

4664
    """
4665
    try:
4666
      for tag in self.op.tags:
4667
        self.target.AddTag(tag)
4668
    except errors.TagError, err:
4669
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4670
    try:
4671
      self.cfg.Update(self.target)
4672
    except errors.ConfigurationError:
4673
      raise errors.OpRetryError("There has been a modification to the"
4674
                                " config file and the operation has been"
4675
                                " aborted. Please retry.")
4676

    
4677

    
4678
class LUDelTags(TagsLU):
4679
  """Delete a list of tags from a given object.
4680

4681
  """
4682
  _OP_REQP = ["kind", "name", "tags"]
4683

    
4684
  def CheckPrereq(self):
4685
    """Check prerequisites.
4686

4687
    This checks that we have the given tag.
4688

4689
    """
4690
    TagsLU.CheckPrereq(self)
4691
    for tag in self.op.tags:
4692
      objects.TaggableObject.ValidateTag(tag)
4693
    del_tags = frozenset(self.op.tags)
4694
    cur_tags = self.target.GetTags()
4695
    if not del_tags <= cur_tags:
4696
      diff_tags = del_tags - cur_tags
4697
      diff_names = ["'%s'" % tag for tag in diff_tags]
4698
      diff_names.sort()
4699
      raise errors.OpPrereqError("Tag(s) %s not found" %
4700
                                 (",".join(diff_names)))
4701

    
4702
  def Exec(self, feedback_fn):
4703
    """Remove the tag from the object.
4704

4705
    """
4706
    for tag in self.op.tags:
4707
      self.target.RemoveTag(tag)
4708
    try:
4709
      self.cfg.Update(self.target)
4710
    except errors.ConfigurationError:
4711
      raise errors.OpRetryError("There has been a modification to the"
4712
                                " config file and the operation has been"
4713
                                " aborted. Please retry.")
4714

    
4715
class LUTestDelay(NoHooksLU):
4716
  """Sleep for a specified amount of time.
4717

4718
  This LU sleeps on the master and/or nodes for a specified amoutn of
4719
  time.
4720

4721
  """
4722
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4723

    
4724
  def CheckPrereq(self):
4725
    """Check prerequisites.
4726

4727
    This checks that we have a good list of nodes and/or the duration
4728
    is valid.
4729

4730
    """
4731

    
4732
    if self.op.on_nodes:
4733
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4734

    
4735
  def Exec(self, feedback_fn):
4736
    """Do the actual sleep.
4737

4738
    """
4739
    if self.op.on_master:
4740
      if not utils.TestDelay(self.op.duration):
4741
        raise errors.OpExecError("Error during master delay test")
4742
    if self.op.on_nodes:
4743
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4744
      if not result:
4745
        raise errors.OpExecError("Complete failure from rpc call")
4746
      for node, node_result in result.items():
4747
        if not node_result:
4748
          raise errors.OpExecError("Failure during rpc call to node %s,"
4749
                                   " result: %s" % (node, node_result))
4750

    
4751

    
4752
class IAllocator(object):
4753
  """IAllocator framework.
4754

4755
  An IAllocator instance has three sets of attributes:
4756
    - cfg/sstore that are needed to query the cluster
4757
    - input data (all members of the _KEYS class attribute are required)
4758
    - four buffer attributes (in|out_data|text), that represent the
4759
      input (to the external script) in text and data structure format,
4760
      and the output from it, again in two formats
4761
    - the result variables from the script (success, info, nodes) for
4762
      easy usage
4763

4764
  """
4765
  _KEYS = [
4766
    "mode", "name",
4767
    "mem_size", "disks", "disk_template",
4768
    "os", "tags", "nics", "vcpus",
4769
    ]
4770

    
4771
  def __init__(self, cfg, sstore, **kwargs):
4772
    self.cfg = cfg
4773
    self.sstore = sstore
4774
    # init buffer variables
4775
    self.in_text = self.out_text = self.in_data = self.out_data = None
4776
    # init all input fields so that pylint is happy
4777
    self.mode = self.name = None
4778
    self.mem_size = self.disks = self.disk_template = None
4779
    self.os = self.tags = self.nics = self.vcpus = None
4780
    # computed fields
4781
    self.required_nodes = None
4782
    # init result fields
4783
    self.success = self.info = self.nodes = None
4784
    for key in kwargs:
4785
      if key not in self._KEYS:
4786
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4787
                                     " IAllocator" % key)
4788
      setattr(self, key, kwargs[key])
4789
    for key in self._KEYS:
4790
      if key not in kwargs:
4791
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4792
                                     " IAllocator" % key)
4793
    self._BuildInputData()
4794

    
4795
  def _ComputeClusterData(self):
4796
    """Compute the generic allocator input data.
4797

4798
    This is the data that is independent of the actual operation.
4799

4800
    """
4801
    cfg = self.cfg
4802
    # cluster data
4803
    data = {
4804
      "version": 1,
4805
      "cluster_name": self.sstore.GetClusterName(),
4806
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4807
      # we don't have job IDs
4808
      }
4809

    
4810
    # node data
4811
    node_results = {}
4812
    node_list = cfg.GetNodeList()
4813
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4814
    for nname in node_list:
4815
      ninfo = cfg.GetNodeInfo(nname)
4816
      if nname not in node_data or not isinstance(node_data[nname], dict):
4817
        raise errors.OpExecError("Can't get data for node %s" % nname)
4818
      remote_info = node_data[nname]
4819
      for attr in ['memory_total', 'memory_free',
4820
                   'vg_size', 'vg_free']:
4821
        if attr not in remote_info:
4822
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4823
                                   (nname, attr))
4824
        try:
4825
          int(remote_info[attr])
4826
        except ValueError, err:
4827
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4828
                                   " %s" % (nname, attr, str(err)))
4829
      pnr = {
4830
        "tags": list(ninfo.GetTags()),
4831
        "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4832
        "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4833
        "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4834
        "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4835
        "primary_ip": ninfo.primary_ip,
4836
        "secondary_ip": ninfo.secondary_ip,
4837
        }
4838
      node_results[nname] = pnr
4839
    data["nodes"] = node_results
4840

    
4841
    # instance data
4842
    instance_data = {}
4843
    i_list = cfg.GetInstanceList()
4844
    for iname in i_list:
4845
      iinfo = cfg.GetInstanceInfo(iname)
4846
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4847
                  for n in iinfo.nics]
4848
      pir = {
4849
        "tags": list(iinfo.GetTags()),
4850
        "should_run": iinfo.status == "up",
4851
        "vcpus": iinfo.vcpus,
4852
        "memory": iinfo.memory,
4853
        "os": iinfo.os,
4854
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4855
        "nics": nic_data,
4856
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4857
        "disk_template": iinfo.disk_template,
4858
        }
4859
      instance_data[iname] = pir
4860

    
4861
    data["instances"] = instance_data
4862

    
4863
    self.in_data = data
4864

    
4865
  def _AddNewInstance(self):
4866
    """Add new instance data to allocator structure.
4867

4868
    This in combination with _AllocatorGetClusterData will create the
4869
    correct structure needed as input for the allocator.
4870

4871
    The checks for the completeness of the opcode must have already been
4872
    done.
4873

4874
    """
4875
    data = self.in_data
4876
    if len(self.disks) != 2:
4877
      raise errors.OpExecError("Only two-disk configurations supported")
4878

    
4879
    disk_space = _ComputeDiskSize(self.disk_template,
4880
                                  self.disks[0]["size"], self.disks[1]["size"])
4881

    
4882
    if self.disk_template in constants.DTS_NET_MIRROR:
4883
      self.required_nodes = 2
4884
    else:
4885
      self.required_nodes = 1
4886
    request = {
4887
      "type": "allocate",
4888
      "name": self.name,
4889
      "disk_template": self.disk_template,
4890
      "tags": self.tags,
4891
      "os": self.os,
4892
      "vcpus": self.vcpus,
4893
      "memory": self.mem_size,
4894
      "disks": self.disks,
4895
      "disk_space_total": disk_space,
4896
      "nics": self.nics,
4897
      "required_nodes": self.required_nodes,
4898
      }
4899
    data["request"] = request
4900

    
4901
  def _AddRelocateInstance(self):
4902
    """Add relocate instance data to allocator structure.
4903

4904
    This in combination with _IAllocatorGetClusterData will create the
4905
    correct structure needed as input for the allocator.
4906

4907
    The checks for the completeness of the opcode must have already been
4908
    done.
4909

4910
    """
4911
    instance = self.cfg.GetInstanceInfo(self.name)
4912
    if instance is None:
4913
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4914
                                   " IAllocator" % self.name)
4915

    
4916
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4917
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4918

    
4919
    if len(instance.secondary_nodes) != 1:
4920
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4921

    
4922
    self.required_nodes = 1
4923

    
4924
    disk_space = _ComputeDiskSize(instance.disk_template,
4925
                                  instance.disks[0].size,
4926
                                  instance.disks[1].size)
4927

    
4928
    request = {
4929
      "type": "relocate",
4930
      "name": self.name,
4931
      "disk_space_total": disk_space,
4932
      "required_nodes": self.required_nodes,
4933
      "nodes": list(instance.secondary_nodes),
4934
      }
4935
    self.in_data["request"] = request
4936

    
4937
  def _BuildInputData(self):
4938
    """Build input data structures.
4939

4940
    """
4941
    self._ComputeClusterData()
4942

    
4943
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4944
      self._AddNewInstance()
4945
    else:
4946
      self._AddRelocateInstance()
4947

    
4948
    self.in_text = serializer.Dump(self.in_data)
4949

    
4950
  def Run(self, name, validate=True):
4951
    """Run an instance allocator and return the results.
4952

4953
    """
4954
    data = self.in_text
4955

    
4956
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4957
                                  os.path.isfile)
4958
    if alloc_script is None:
4959
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4960

    
4961
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4962
    try:
4963
      os.write(fd, data)
4964
      os.close(fd)
4965
      result = utils.RunCmd([alloc_script, fin_name])
4966
      if result.failed:
4967
        raise errors.OpExecError("Instance allocator call failed: %s,"
4968
                                 " output: %s" %
4969
                                 (result.fail_reason, result.output))
4970
    finally:
4971
      os.unlink(fin_name)
4972
    self.out_text = result.stdout
4973
    if validate:
4974
      self._ValidateResult()
4975

    
4976
  def _ValidateResult(self):
4977
    """Process the allocator results.
4978

4979
    This will process and if successful save the result in
4980
    self.out_data and the other parameters.
4981

4982
    """
4983
    try:
4984
      rdict = serializer.Load(self.out_text)
4985
    except Exception, err:
4986
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4987

    
4988
    if not isinstance(rdict, dict):
4989
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4990

    
4991
    for key in "success", "info", "nodes":
4992
      if key not in rdict:
4993
        raise errors.OpExecError("Can't parse iallocator results:"
4994
                                 " missing key '%s'" % key)
4995
      setattr(self, key, rdict[key])
4996

    
4997
    if not isinstance(rdict["nodes"], list):
4998
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4999
                               " is not a list")
5000
    self.out_data = rdict
5001

    
5002

    
5003
class LUTestAllocator(NoHooksLU):
5004
  """Run allocator tests.
5005

5006
  This LU runs the allocator tests
5007

5008
  """
5009
  _OP_REQP = ["direction", "mode", "name"]
5010

    
5011
  def CheckPrereq(self):
5012
    """Check prerequisites.
5013

5014
    This checks the opcode parameters depending on the director and mode test.
5015

5016
    """
5017
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5018
      for attr in ["name", "mem_size", "disks", "disk_template",
5019
                   "os", "tags", "nics", "vcpus"]:
5020
        if not hasattr(self.op, attr):
5021
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5022
                                     attr)
5023
      iname = self.cfg.ExpandInstanceName(self.op.name)
5024
      if iname is not None:
5025
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5026
                                   iname)
5027
      if not isinstance(self.op.nics, list):
5028
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5029
      for row in self.op.nics:
5030
        if (not isinstance(row, dict) or
5031
            "mac" not in row or
5032
            "ip" not in row or
5033
            "bridge" not in row):
5034
          raise errors.OpPrereqError("Invalid contents of the"
5035
                                     " 'nics' parameter")
5036
      if not isinstance(self.op.disks, list):
5037
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5038
      if len(self.op.disks) != 2:
5039
        raise errors.OpPrereqError("Only two-disk configurations supported")
5040
      for row in self.op.disks:
5041
        if (not isinstance(row, dict) or
5042
            "size" not in row or
5043
            not isinstance(row["size"], int) or
5044
            "mode" not in row or
5045
            row["mode"] not in ['r', 'w']):
5046
          raise errors.OpPrereqError("Invalid contents of the"
5047
                                     " 'disks' parameter")
5048
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5049
      if not hasattr(self.op, "name"):
5050
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5051
      fname = self.cfg.ExpandInstanceName(self.op.name)
5052
      if fname is None:
5053
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5054
                                   self.op.name)
5055
      self.op.name = fname
5056
    else:
5057
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5058
                                 self.op.mode)
5059

    
5060
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5061
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5062
        raise errors.OpPrereqError("Missing allocator name")
5063
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5064
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5065
                                 self.op.direction)
5066

    
5067
  def Exec(self, feedback_fn):
5068
    """Run the allocator test.
5069

5070
    """
5071
    ial = IAllocator(self.cfg, self.sstore,
5072
                     mode=self.op.mode,
5073
                     name=self.op.name,
5074
                     mem_size=self.op.mem_size,
5075
                     disks=self.op.disks,
5076
                     disk_template=self.op.disk_template,
5077
                     os=self.op.os,
5078
                     tags=self.op.tags,
5079
                     nics=self.op.nics,
5080
                     vcpus=self.op.vcpus,
5081
                     )
5082

    
5083
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5084
      result = ial.in_text
5085
    else:
5086
      ial.Run(self.op.allocator, validate=False)
5087
      result = ial.out_text
5088
    return result